Merge pull request #1695 from tonyhb/add-regulator-to-filesystem

Add regulator to filesystem
This commit is contained in:
Richard Scothern 2016-05-04 10:05:51 -07:00
commit c047d34b22
6 changed files with 330 additions and 15 deletions

View file

@ -78,6 +78,7 @@ information about each option that appears later in this page.
storage: storage:
filesystem: filesystem:
rootdirectory: /var/lib/registry rootdirectory: /var/lib/registry
maxthreads: 100
azure: azure:
accountname: accountname accountname: accountname
accountkey: base64encodedaccountkey accountkey: base64encodedaccountkey

View file

@ -16,3 +16,7 @@ An implementation of the `storagedriver.StorageDriver` interface which uses the
`rootdirectory`: (optional) The absolute path to a root directory tree in which `rootdirectory`: (optional) The absolute path to a root directory tree in which
to store all registry files. The registry stores all its data here so make sure to store all registry files. The registry stores all its data here so make sure
there is adequate space available. Defaults to `/var/lib/registry`. there is adequate space available. Defaults to `/var/lib/registry`.
`maxthreads`: (optional) The maximum number of simultaneous blocking filesystem
operations permitted within the registry. Each operation spawns a new thread and
may cause thread exhaustion issues if many are done in parallel. Defaults to
`100`, and can be no lower than `25`.

View file

@ -132,8 +132,15 @@ func makeTestEnv(t *testing.T, name string) *testEnv {
t.Fatalf("unable to create tempdir: %s", err) t.Fatalf("unable to create tempdir: %s", err)
} }
localDriver, err := filesystem.FromParameters(map[string]interface{}{
"rootdirectory": truthDir,
})
if err != nil {
t.Fatalf("unable to create filesystem driver: %s", err)
}
// todo: create a tempfile area here // todo: create a tempfile area here
localRegistry, err := storage.NewRegistry(ctx, filesystem.New(truthDir), storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), storage.EnableRedirect, storage.DisableDigestResumption) localRegistry, err := storage.NewRegistry(ctx, localDriver, storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), storage.EnableRedirect, storage.DisableDigestResumption)
if err != nil { if err != nil {
t.Fatalf("error creating registry: %v", err) t.Fatalf("error creating registry: %v", err)
} }
@ -142,7 +149,14 @@ func makeTestEnv(t *testing.T, name string) *testEnv {
t.Fatalf("unexpected error getting repo: %v", err) t.Fatalf("unexpected error getting repo: %v", err)
} }
truthRegistry, err := storage.NewRegistry(ctx, filesystem.New(cacheDir), storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider())) cacheDriver, err := filesystem.FromParameters(map[string]interface{}{
"rootdirectory": cacheDir,
})
if err != nil {
t.Fatalf("unable to create filesystem driver: %s", err)
}
truthRegistry, err := storage.NewRegistry(ctx, cacheDriver, storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()))
if err != nil { if err != nil {
t.Fatalf("error creating registry: %v", err) t.Fatalf("error creating registry: %v", err)
} }

View file

@ -0,0 +1,145 @@
package base
import (
"io"
"sync"
"github.com/docker/distribution/context"
storagedriver "github.com/docker/distribution/registry/storage/driver"
)
type regulator struct {
storagedriver.StorageDriver
*sync.Cond
available uint64
}
// NewRegulator wraps the given driver and is used to regulate concurrent calls
// to the given storage driver to a maximum of the given limit. This is useful
// for storage drivers that would otherwise create an unbounded number of OS
// threads if allowed to be called unregulated.
func NewRegulator(driver storagedriver.StorageDriver, limit uint64) storagedriver.StorageDriver {
return &regulator{
StorageDriver: driver,
Cond: sync.NewCond(&sync.Mutex{}),
available: limit,
}
}
func (r *regulator) enter() {
r.L.Lock()
for r.available == 0 {
r.Wait()
}
r.available--
r.L.Unlock()
}
func (r *regulator) exit() {
r.L.Lock()
// We only need to signal to a waiting FS operation if we're already at the
// limit of threads used
if r.available == 0 {
r.Signal()
}
r.available++
r.L.Unlock()
}
// Name returns the human-readable "name" of the driver, useful in error
// messages and logging. By convention, this will just be the registration
// name, but drivers may provide other information here.
func (r *regulator) Name() string {
r.enter()
defer r.exit()
return r.StorageDriver.Name()
}
// GetContent retrieves the content stored at "path" as a []byte.
// This should primarily be used for small objects.
func (r *regulator) GetContent(ctx context.Context, path string) ([]byte, error) {
r.enter()
defer r.exit()
return r.StorageDriver.GetContent(ctx, path)
}
// PutContent stores the []byte content at a location designated by "path".
// This should primarily be used for small objects.
func (r *regulator) PutContent(ctx context.Context, path string, content []byte) error {
r.enter()
defer r.exit()
return r.StorageDriver.PutContent(ctx, path, content)
}
// Reader retrieves an io.ReadCloser for the content stored at "path"
// with a given byte offset.
// May be used to resume reading a stream by providing a nonzero offset.
func (r *regulator) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
r.enter()
defer r.exit()
return r.StorageDriver.Reader(ctx, path, offset)
}
// Writer stores the contents of the provided io.ReadCloser at a
// location designated by the given path.
// May be used to resume writing a stream by providing a nonzero offset.
// The offset must be no larger than the CurrentSize for this path.
func (r *regulator) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
r.enter()
defer r.exit()
return r.StorageDriver.Writer(ctx, path, append)
}
// Stat retrieves the FileInfo for the given path, including the current
// size in bytes and the creation time.
func (r *regulator) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
r.enter()
defer r.exit()
return r.StorageDriver.Stat(ctx, path)
}
// List returns a list of the objects that are direct descendants of the
//given path.
func (r *regulator) List(ctx context.Context, path string) ([]string, error) {
r.enter()
defer r.exit()
return r.StorageDriver.List(ctx, path)
}
// Move moves an object stored at sourcePath to destPath, removing the
// original object.
// Note: This may be no more efficient than a copy followed by a delete for
// many implementations.
func (r *regulator) Move(ctx context.Context, sourcePath string, destPath string) error {
r.enter()
defer r.exit()
return r.StorageDriver.Move(ctx, sourcePath, destPath)
}
// Delete recursively deletes all objects stored at "path" and its subpaths.
func (r *regulator) Delete(ctx context.Context, path string) error {
r.enter()
defer r.exit()
return r.StorageDriver.Delete(ctx, path)
}
// URLFor returns a URL which may be used to retrieve the content stored at
// the given path, possibly using the given options.
// May return an ErrUnsupportedMethod in certain StorageDriver
// implementations.
func (r *regulator) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
r.enter()
defer r.exit()
return r.StorageDriver.URLFor(ctx, path, options)
}

View file

@ -8,6 +8,8 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"path" "path"
"reflect"
"strconv"
"time" "time"
"github.com/docker/distribution/context" "github.com/docker/distribution/context"
@ -16,8 +18,23 @@ import (
"github.com/docker/distribution/registry/storage/driver/factory" "github.com/docker/distribution/registry/storage/driver/factory"
) )
const driverName = "filesystem" const (
const defaultRootDirectory = "/var/lib/registry" driverName = "filesystem"
defaultRootDirectory = "/var/lib/registry"
defaultMaxThreads = uint64(100)
// minThreads is the minimum value for the maxthreads configuration
// parameter. If the driver's parameters are less than this we set
// the parameters to minThreads
minThreads = uint64(25)
)
// DriverParameters represents all configuration options available for the
// filesystem driver
type DriverParameters struct {
RootDirectory string
MaxThreads uint64
}
func init() { func init() {
factory.Register(driverName, &filesystemDriverFactory{}) factory.Register(driverName, &filesystemDriverFactory{})
@ -27,7 +44,7 @@ func init() {
type filesystemDriverFactory struct{} type filesystemDriverFactory struct{}
func (factory *filesystemDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) { func (factory *filesystemDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
return FromParameters(parameters), nil return FromParameters(parameters)
} }
type driver struct { type driver struct {
@ -47,25 +64,72 @@ type Driver struct {
// FromParameters constructs a new Driver with a given parameters map // FromParameters constructs a new Driver with a given parameters map
// Optional Parameters: // Optional Parameters:
// - rootdirectory // - rootdirectory
func FromParameters(parameters map[string]interface{}) *Driver { // - maxthreads
var rootDirectory = defaultRootDirectory func FromParameters(parameters map[string]interface{}) (*Driver, error) {
params, err := fromParametersImpl(parameters)
if err != nil || params == nil {
return nil, err
}
return New(*params), nil
}
func fromParametersImpl(parameters map[string]interface{}) (*DriverParameters, error) {
var (
err error
maxThreads = defaultMaxThreads
rootDirectory = defaultRootDirectory
)
if parameters != nil { if parameters != nil {
rootDir, ok := parameters["rootdirectory"] if rootDir, ok := parameters["rootdirectory"]; ok {
if ok {
rootDirectory = fmt.Sprint(rootDir) rootDirectory = fmt.Sprint(rootDir)
} }
// Get maximum number of threads for blocking filesystem operations,
// if specified
threads := parameters["maxthreads"]
switch v := threads.(type) {
case string:
if maxThreads, err = strconv.ParseUint(v, 0, 64); err != nil {
return nil, fmt.Errorf("maxthreads parameter must be an integer, %v invalid", threads)
}
case uint64:
maxThreads = v
case int, int32, int64:
val := reflect.ValueOf(v).Convert(reflect.TypeOf(threads)).Int()
// If threads is negative casting to uint64 will wrap around and
// give you the hugest thread limit ever. Let's be sensible, here
if val > 0 {
maxThreads = uint64(val)
}
case uint, uint32:
maxThreads = reflect.ValueOf(v).Convert(reflect.TypeOf(threads)).Uint()
case nil:
// do nothing
default:
return nil, fmt.Errorf("invalid value for maxthreads: %#v", threads)
}
if maxThreads < minThreads {
maxThreads = minThreads
}
} }
return New(rootDirectory)
params := &DriverParameters{
RootDirectory: rootDirectory,
MaxThreads: maxThreads,
}
return params, nil
} }
// New constructs a new Driver with a given rootDirectory // New constructs a new Driver with a given rootDirectory
func New(rootDirectory string) *Driver { func New(params DriverParameters) *Driver {
fsDriver := &driver{rootDirectory: params.RootDirectory}
return &Driver{ return &Driver{
baseEmbed: baseEmbed{ baseEmbed: baseEmbed{
Base: base.Base{ Base: base.Base{
StorageDriver: &driver{ StorageDriver: base.NewRegulator(fsDriver, params.MaxThreads),
rootDirectory: rootDirectory,
},
}, },
}, },
} }

View file

@ -3,6 +3,7 @@ package filesystem
import ( import (
"io/ioutil" "io/ioutil"
"os" "os"
"reflect"
"testing" "testing"
storagedriver "github.com/docker/distribution/registry/storage/driver" storagedriver "github.com/docker/distribution/registry/storage/driver"
@ -20,7 +21,93 @@ func init() {
} }
defer os.Remove(root) defer os.Remove(root)
driver, err := FromParameters(map[string]interface{}{
"rootdirectory": root,
})
if err != nil {
panic(err)
}
testsuites.RegisterSuite(func() (storagedriver.StorageDriver, error) { testsuites.RegisterSuite(func() (storagedriver.StorageDriver, error) {
return New(root), nil return driver, nil
}, testsuites.NeverSkip) }, testsuites.NeverSkip)
} }
func TestFromParametersImpl(t *testing.T) {
tests := []struct {
params map[string]interface{} // techincally the yaml can contain anything
expected DriverParameters
pass bool
}{
// check we use default threads and root dirs
{
params: map[string]interface{}{},
expected: DriverParameters{
RootDirectory: defaultRootDirectory,
MaxThreads: defaultMaxThreads,
},
pass: true,
},
// Testing initiation with a string maxThreads which can't be parsed
{
params: map[string]interface{}{
"maxthreads": "fail",
},
expected: DriverParameters{},
pass: false,
},
{
params: map[string]interface{}{
"maxthreads": "100",
},
expected: DriverParameters{
RootDirectory: defaultRootDirectory,
MaxThreads: uint64(100),
},
pass: true,
},
{
params: map[string]interface{}{
"maxthreads": 100,
},
expected: DriverParameters{
RootDirectory: defaultRootDirectory,
MaxThreads: uint64(100),
},
pass: true,
},
// check that we use minimum thread counts
{
params: map[string]interface{}{
"maxthreads": 1,
},
expected: DriverParameters{
RootDirectory: defaultRootDirectory,
MaxThreads: minThreads,
},
pass: true,
},
}
for _, item := range tests {
params, err := fromParametersImpl(item.params)
if !item.pass {
// We only need to assert that expected failures have an error
if err == nil {
t.Fatalf("expected error configuring filesystem driver with invalid param: %+v", item.params)
}
continue
}
if err != nil {
t.Fatalf("unexpected error creating filesystem driver: %s", err)
}
// Note that we get a pointer to params back
if !reflect.DeepEqual(*params, item.expected) {
t.Fatalf("unexpected params from filesystem driver. expected %+v, got %+v", item.expected, params)
}
}
}