Remove half-baked Storage Driver IPC support

This removes documentation and code related to IPC based storage driver
plugins. The existence of this functionality was an original feature goal but
is now not maintained and actively confusing incoming contributions. We will
likely explore some driver plugin mechanism in the future but we don't need
this laying around in the meantime.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
pull/670/head
Stephen J Day 2015-06-29 16:39:45 -07:00
parent 3830c87469
commit d3d4423ff7
15 changed files with 63 additions and 1031 deletions

View File

@ -1,31 +0,0 @@
// +build ignore
package main
import (
"encoding/json"
"os"
log "github.com/Sirupsen/logrus"
"github.com/docker/distribution/registry/storage/driver/azure"
"github.com/docker/distribution/registry/storage/driver/ipc"
)
// An out-of-process Azure Storage driver, intended to be run by ipc.NewDriverClient
func main() {
parametersBytes := []byte(os.Args[1])
var parameters map[string]interface{}
err := json.Unmarshal(parametersBytes, &parameters)
if err != nil {
panic(err)
}
driver, err := azure.FromParameters(parameters)
if err != nil {
panic(err)
}
if err := ipc.StorageDriverServer(driver); err != nil {
log.Fatalln("driver error:", err)
}
}

View File

@ -1,27 +0,0 @@
// +build ignore
package main
import (
"encoding/json"
"os"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/registry/storage/driver/filesystem"
"github.com/docker/distribution/registry/storage/driver/ipc"
)
// An out-of-process filesystem driver, intended to be run by ipc.NewDriverClient
func main() {
parametersBytes := []byte(os.Args[1])
var parameters map[string]string
err := json.Unmarshal(parametersBytes, &parameters)
if err != nil {
panic(err)
}
if err := ipc.StorageDriverServer(filesystem.FromParameters(parameters)); err != nil {
logrus.Fatalln(err)
}
}

View File

@ -1,17 +0,0 @@
// +build ignore
package main
import (
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/registry/storage/driver/inmemory"
"github.com/docker/distribution/registry/storage/driver/ipc"
)
// An out-of-process inmemory driver, intended to be run by ipc.NewDriverClient
// This exists primarily for example and testing purposes
func main() {
if err := ipc.StorageDriverServer(inmemory.New()); err != nil {
logrus.Fatalln(err)
}
}

View File

@ -1,32 +0,0 @@
// +build ignore
package main
import (
"encoding/json"
"os"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/registry/storage/driver/ipc"
"github.com/docker/distribution/registry/storage/driver/s3"
)
// An out-of-process S3 driver, intended to be run by ipc.NewDriverClient
func main() {
parametersBytes := []byte(os.Args[1])
var parameters map[string]string
err := json.Unmarshal(parametersBytes, &parameters)
if err != nil {
panic(err)
}
driver, err := s3.FromParameters(parameters)
if err != nil {
panic(err)
}
if err := ipc.StorageDriverServer(driver); err != nil {
logrus.Fatalln(err)
}
}

View File

@ -30,29 +30,30 @@ The storage driver API is designed to model a filesystem-like key/value storage
Storage drivers are required to implement the `storagedriver.StorageDriver` interface provided in `storagedriver.go`, which includes methods for reading, writing, and deleting content, as well as listing child objects of a specified prefix key.
Storage drivers are intended (but not required) to be written in go, providing compile-time validation of the `storagedriver.StorageDriver` interface, although an IPC driver wrapper means that it is not required for drivers to be included in the compiled registry. The `storagedriver/ipc` package provides a client/server protocol for running storage drivers provided in external executables as a managed child server process.
Storage drivers are intended to be written in Go, providing compile-time
validation of the `storagedriver.StorageDriver` interface.
## Driver Selection and Configuration
The preferred method of selecting a storage driver is using the `StorageDriverFactory` interface in the `storagedriver/factory` package. These factories provide a common interface for constructing storage drivers with a parameters map. The factory model is based off of the [Register](http://golang.org/pkg/database/sql/#Register) and [Open](http://golang.org/pkg/database/sql/#Open) methods in the builtin [database/sql](http://golang.org/pkg/database/sql) package.
Storage driver factories may be registered by name using the `factory.Register` method, and then later invoked by calling `factory.Create` with a driver name and parameters map. If no driver is registered with the given name, this factory will attempt to find an executable storage driver with the executable name "registry-storage-\<driver name\>" and return an IPC storage driver wrapper managing the driver subprocess. If no such storage driver can be found, `factory.Create` will return an `InvalidStorageDriverError`.
Storage driver factories may be registered by name using the
`factory.Register` method, and then later invoked by calling `factory.Create`
with a driver name and parameters map. If no such storage driver can be found,
`factory.Create` will return an `InvalidStorageDriverError`.
## Driver Contribution
### Writing new storage drivers
To create a valid storage driver, one must implement the `storagedriver.StorageDriver` interface and make sure to expose this driver via the factory system and as a distributable IPC server executable.
To create a valid storage driver, one must implement the
`storagedriver.StorageDriver` interface and make sure to expose this driver
via the factory system.
#### In-process drivers
#### Registering
Storage drivers should call `factory.Register` with their driver name in an `init` method, allowing callers of `factory.New` to construct instances of this driver without requiring modification of imports throughout the codebase.
#### Out-of-process drivers
As many users will run the registry as a pre-constructed docker container, storage drivers should also be distributable as IPC server executables. Drivers written in go should model the main method provided in `registry/storage/driverfilesystem/driver.go`. Parameters to IPC drivers will be provided as a JSON-serialized map in the first argument to the process. These parameters should be validated and then a blocking call to `ipc.StorageDriverServer` should be made with a new storage driver.
Out-of-process drivers must also implement the `ipc.IPCStorageDriver` interface, which exposes a `Version` check for the storage driver. This is used to validate storage driver api compatibility at driver load-time.
## Testing
Storage driver test suites are provided in `storagedriver/testsuites/testsuites.go` and may be used for any storage driver written in go. Two methods are provided for registering test suites, `RegisterInProcessSuite` and `RegisterIPCSuite`, which run the same set of tests for the driver imported or managed over IPC respectively.
## Drivers written in other languages
Although storage drivers are strongly recommended to be written in go for consistency, compile-time validation, and support, the IPC framework allows for a level of language-agnosticism. Non-go drivers must implement the storage driver protocol by mimicing StorageDriverServer in `storagedriver/ipc/server.go`. As the IPC framework is a layer on top of [docker/libchan](https://github.com/docker/libchan), this currently limits language support to Java via [ndeloof/chan](https://github.com/ndeloof/jchan) and Javascript via [GraftJS/jschan](https://github.com/GraftJS/jschan), although contributions to the libchan project are welcome.
Storage driver test suites are provided in
`storagedriver/testsuites/testsuites.go` and may be used for any storage
driver written in Go. Tests can be registered using the `RegisterSuite`
function, which run the same set of tests for any registered drivers.

View File

@ -59,11 +59,5 @@ func init() {
return ""
}
testsuites.RegisterInProcessSuite(azureDriverConstructor, skipCheck)
// testsuites.RegisterIPCSuite(driverName, map[string]string{
// paramAccountName: accountName,
// paramAccountKey: accountKey,
// paramContainer: container,
// paramRealm: realm,
// }, skipCheck)
testsuites.RegisterSuite(azureDriverConstructor, skipCheck)
}

View File

@ -33,30 +33,14 @@ func Register(name string, factory StorageDriverFactory) {
driverFactories[name] = factory
}
// Create a new storagedriver.StorageDriver with the given name and parameters
// To run in-process, the StorageDriverFactory must first be registered with the given name
// If no in-process drivers are found with the given name, this attempts to create an IPC driver
// If no in-process or external drivers are found, an InvalidStorageDriverError is returned
// Create a new storagedriver.StorageDriver with the given name and
// parameters. To use a driver, the StorageDriverFactory must first be
// registered with the given name. If no drivers are found, an
// InvalidStorageDriverError is returned
func Create(name string, parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
driverFactory, ok := driverFactories[name]
if !ok {
return nil, InvalidStorageDriverError{name}
// NOTE(stevvooe): We are disabling storagedriver ipc for now, as the
// server and client need to be updated for the changed API calls and
// there were some problems libchan hanging. We'll phase this
// functionality back in over the next few weeks.
// No registered StorageDriverFactory found, try ipc
// driverClient, err := ipc.NewDriverClient(name, parameters)
// if err != nil {
// return nil, InvalidStorageDriverError{name}
// }
// err = driverClient.Start()
// if err != nil {
// return nil, err
// }
// return driverClient, nil
}
return driverFactory.Create(parameters)
}

View File

@ -20,10 +20,7 @@ func init() {
}
defer os.Remove(root)
testsuites.RegisterInProcessSuite(func() (storagedriver.StorageDriver, error) {
testsuites.RegisterSuite(func() (storagedriver.StorageDriver, error) {
return New(root), nil
}, testsuites.NeverSkip)
// BUG(stevvooe): IPC is broken so we're disabling for now. Will revisit later.
// testsuites.RegisterIPCSuite(driverName, map[string]string{"rootdirectory": root}, testsuites.NeverSkip)
}

View File

@ -5,7 +5,6 @@ import (
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/testsuites"
"gopkg.in/check.v1"
)
@ -16,9 +15,5 @@ func init() {
inmemoryDriverConstructor := func() (storagedriver.StorageDriver, error) {
return New(), nil
}
testsuites.RegisterInProcessSuite(inmemoryDriverConstructor, testsuites.NeverSkip)
// BUG(stevvooe): Disable flaky IPC tests for now when we can troubleshoot
// the problems with libchan.
// testsuites.RegisterIPCSuite(driverName, nil, testsuites.NeverSkip)
testsuites.RegisterSuite(inmemoryDriverConstructor, testsuites.NeverSkip)
}

View File

@ -1,454 +0,0 @@
// +build ignore
package ipc
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"os"
"os/exec"
"syscall"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/libchan"
"github.com/docker/libchan/spdy"
)
// StorageDriverExecutablePrefix is the prefix which the IPC storage driver
// loader expects driver executables to begin with. For example, the s3 driver
// should be named "registry-storagedriver-s3".
const StorageDriverExecutablePrefix = "registry-storagedriver-"
// StorageDriverClient is a storagedriver.StorageDriver implementation using a
// managed child process communicating over IPC using libchan with a unix domain
// socket
type StorageDriverClient struct {
subprocess *exec.Cmd
exitChan chan error
exitErr error
stopChan chan struct{}
socket *os.File
transport *spdy.Transport
sender libchan.Sender
version storagedriver.Version
}
// NewDriverClient constructs a new out-of-process storage driver using the
// driver name and configuration parameters
// A user must call Start on this driver client before remote method calls can
// be made
//
// Looks for drivers in the following locations in order:
// - Storage drivers directory (to be determined, yet not implemented)
// - $GOPATH/bin
// - $PATH
func NewDriverClient(name string, parameters map[string]string) (*StorageDriverClient, error) {
paramsBytes, err := json.Marshal(parameters)
if err != nil {
return nil, err
}
driverExecName := StorageDriverExecutablePrefix + name
driverPath, err := exec.LookPath(driverExecName)
if err != nil {
return nil, err
}
command := exec.Command(driverPath, string(paramsBytes))
return &StorageDriverClient{
subprocess: command,
}, nil
}
// Start starts the designated child process storage driver and binds a socket
// to this process for IPC method calls
func (driver *StorageDriverClient) Start() error {
driver.exitErr = nil
driver.exitChan = make(chan error)
driver.stopChan = make(chan struct{})
fileDescriptors, err := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM, 0)
if err != nil {
return err
}
childSocket := os.NewFile(uintptr(fileDescriptors[0]), "childSocket")
driver.socket = os.NewFile(uintptr(fileDescriptors[1]), "parentSocket")
driver.subprocess.Stdout = os.Stdout
driver.subprocess.Stderr = os.Stderr
driver.subprocess.ExtraFiles = []*os.File{childSocket}
if err = driver.subprocess.Start(); err != nil {
driver.Stop()
return err
}
go driver.handleSubprocessExit()
if err = childSocket.Close(); err != nil {
driver.Stop()
return err
}
connection, err := net.FileConn(driver.socket)
if err != nil {
driver.Stop()
return err
}
driver.transport, err = spdy.NewClientTransport(connection)
if err != nil {
driver.Stop()
return err
}
driver.sender, err = driver.transport.NewSendChannel()
if err != nil {
driver.Stop()
return err
}
// Check the driver's version to determine compatibility
receiver, remoteSender := libchan.Pipe()
err = driver.sender.Send(&Request{Type: "Version", ResponseChannel: remoteSender})
if err != nil {
driver.Stop()
return err
}
var response VersionResponse
err = receiver.Receive(&response)
if err != nil {
driver.Stop()
return err
}
if response.Error != nil {
return response.Error.Unwrap()
}
driver.version = response.Version
if driver.version.Major() != storagedriver.CurrentVersion.Major() || driver.version.Minor() > storagedriver.CurrentVersion.Minor() {
return IncompatibleVersionError{driver.version}
}
return nil
}
// Stop stops the child process storage driver
// storagedriver.StorageDriver methods called after Stop will fail
func (driver *StorageDriverClient) Stop() error {
var closeSenderErr, closeTransportErr, closeSocketErr, killErr error
if driver.sender != nil {
closeSenderErr = driver.sender.Close()
}
if driver.transport != nil {
closeTransportErr = driver.transport.Close()
}
if driver.socket != nil {
closeSocketErr = driver.socket.Close()
}
if driver.subprocess != nil {
killErr = driver.subprocess.Process.Kill()
}
if driver.stopChan != nil {
close(driver.stopChan)
}
if closeSenderErr != nil {
return closeSenderErr
} else if closeTransportErr != nil {
return closeTransportErr
} else if closeSocketErr != nil {
return closeSocketErr
}
return killErr
}
// Implement the storagedriver.StorageDriver interface over IPC
// GetContent retrieves the content stored at "path" as a []byte.
func (driver *StorageDriverClient) GetContent(path string) ([]byte, error) {
if err := driver.exited(); err != nil {
return nil, err
}
receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"Path": path}
err := driver.sender.Send(&Request{Type: "GetContent", Parameters: params, ResponseChannel: remoteSender})
if err != nil {
return nil, err
}
response := new(ReadStreamResponse)
err = driver.receiveResponse(receiver, response)
if err != nil {
return nil, err
}
if response.Error != nil {
return nil, response.Error.Unwrap()
}
defer response.Reader.Close()
contents, err := ioutil.ReadAll(response.Reader)
if err != nil {
return nil, err
}
return contents, nil
}
// PutContent stores the []byte content at a location designated by "path".
func (driver *StorageDriverClient) PutContent(path string, contents []byte) error {
if err := driver.exited(); err != nil {
return err
}
receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"Path": path, "Reader": ioutil.NopCloser(bytes.NewReader(contents))}
err := driver.sender.Send(&Request{Type: "PutContent", Parameters: params, ResponseChannel: remoteSender})
if err != nil {
return err
}
response := new(WriteStreamResponse)
err = driver.receiveResponse(receiver, response)
if err != nil {
return err
}
if response.Error != nil {
return response.Error.Unwrap()
}
return nil
}
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset.
func (driver *StorageDriverClient) ReadStream(path string, offset int64) (io.ReadCloser, error) {
if err := driver.exited(); err != nil {
return nil, err
}
receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"Path": path, "Offset": offset}
err := driver.sender.Send(&Request{Type: "ReadStream", Parameters: params, ResponseChannel: remoteSender})
if err != nil {
return nil, err
}
response := new(ReadStreamResponse)
err = driver.receiveResponse(receiver, response)
if err != nil {
return nil, err
}
if response.Error != nil {
return nil, response.Error.Unwrap()
}
return response.Reader, nil
}
// WriteStream stores the contents of the provided io.ReadCloser at a location
// designated by the given path.
func (driver *StorageDriverClient) WriteStream(path string, offset, size int64, reader io.ReadCloser) error {
if err := driver.exited(); err != nil {
return err
}
receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"Path": path, "Offset": offset, "Size": size, "Reader": reader}
err := driver.sender.Send(&Request{Type: "WriteStream", Parameters: params, ResponseChannel: remoteSender})
if err != nil {
return err
}
response := new(WriteStreamResponse)
err = driver.receiveResponse(receiver, response)
if err != nil {
return err
}
if response.Error != nil {
return response.Error.Unwrap()
}
return nil
}
// CurrentSize retrieves the curernt size in bytes of the object at the given
// path.
func (driver *StorageDriverClient) CurrentSize(path string) (uint64, error) {
if err := driver.exited(); err != nil {
return 0, err
}
receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"Path": path}
err := driver.sender.Send(&Request{Type: "CurrentSize", Parameters: params, ResponseChannel: remoteSender})
if err != nil {
return 0, err
}
response := new(CurrentSizeResponse)
err = driver.receiveResponse(receiver, response)
if err != nil {
return 0, err
}
if response.Error != nil {
return 0, response.Error.Unwrap()
}
return response.Position, nil
}
// List returns a list of the objects that are direct descendants of the given
// path.
func (driver *StorageDriverClient) List(path string) ([]string, error) {
if err := driver.exited(); err != nil {
return nil, err
}
receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"Path": path}
err := driver.sender.Send(&Request{Type: "List", Parameters: params, ResponseChannel: remoteSender})
if err != nil {
return nil, err
}
response := new(ListResponse)
err = driver.receiveResponse(receiver, response)
if err != nil {
return nil, err
}
if response.Error != nil {
return nil, response.Error.Unwrap()
}
return response.Keys, nil
}
// Move moves an object stored at sourcePath to destPath, removing the original
// object.
func (driver *StorageDriverClient) Move(sourcePath string, destPath string) error {
if err := driver.exited(); err != nil {
return err
}
receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"SourcePath": sourcePath, "DestPath": destPath}
err := driver.sender.Send(&Request{Type: "Move", Parameters: params, ResponseChannel: remoteSender})
if err != nil {
return err
}
response := new(MoveResponse)
err = driver.receiveResponse(receiver, response)
if err != nil {
return err
}
if response.Error != nil {
return response.Error.Unwrap()
}
return nil
}
// Delete recursively deletes all objects stored at "path" and its subpaths.
func (driver *StorageDriverClient) Delete(path string) error {
if err := driver.exited(); err != nil {
return err
}
receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"Path": path}
err := driver.sender.Send(&Request{Type: "Delete", Parameters: params, ResponseChannel: remoteSender})
if err != nil {
return err
}
response := new(DeleteResponse)
err = driver.receiveResponse(receiver, response)
if err != nil {
return err
}
if response.Error != nil {
return response.Error.Unwrap()
}
return nil
}
// handleSubprocessExit populates the exit channel until we have explicitly
// stopped the storage driver subprocess
// Requests can select on driver.exitChan and response receiving and not hang if
// the process exits
func (driver *StorageDriverClient) handleSubprocessExit() {
exitErr := driver.subprocess.Wait()
if exitErr == nil {
exitErr = fmt.Errorf("Storage driver subprocess already exited cleanly")
} else {
exitErr = fmt.Errorf("Storage driver subprocess exited with error: %s", exitErr)
}
driver.exitErr = exitErr
for {
select {
case driver.exitChan <- exitErr:
case <-driver.stopChan:
close(driver.exitChan)
return
}
}
}
// receiveResponse populates the response value with the next result from the
// given receiver, or returns an error if receiving failed or the driver has
// stopped
func (driver *StorageDriverClient) receiveResponse(receiver libchan.Receiver, response interface{}) error {
receiveChan := make(chan error, 1)
go func(receiver libchan.Receiver, receiveChan chan<- error) {
receiveChan <- receiver.Receive(response)
}(receiver, receiveChan)
var err error
var ok bool
select {
case err = <-receiveChan:
case err, ok = <-driver.exitChan:
if !ok {
err = driver.exitErr
}
}
return err
}
// exited returns an exit error if the driver has exited or nil otherwise
func (driver *StorageDriverClient) exited() error {
select {
case err, ok := <-driver.exitChan:
if !ok {
return driver.exitErr
}
return err
default:
return nil
}
}

View File

@ -1,148 +0,0 @@
// +build ignore
package ipc
import (
"fmt"
"io"
"reflect"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/libchan"
)
// StorageDriver is the interface which IPC storage drivers must implement. As external storage
// drivers may be defined to use a different version of the storagedriver.StorageDriver interface,
// we use an additional version check to determine compatiblity.
type StorageDriver interface {
// Version returns the storagedriver.StorageDriver interface version which this storage driver
// implements, which is used to determine driver compatibility
Version() (storagedriver.Version, error)
}
// IncompatibleVersionError is returned when a storage driver is using an incompatible version of
// the storagedriver.StorageDriver api
type IncompatibleVersionError struct {
version storagedriver.Version
}
func (e IncompatibleVersionError) Error() string {
return fmt.Sprintf("Incompatible storage driver version: %s", e.version)
}
// Request defines a remote method call request
// A return value struct is to be sent over the ResponseChannel
type Request struct {
Type string `codec:",omitempty"`
Parameters map[string]interface{} `codec:",omitempty"`
ResponseChannel libchan.Sender `codec:",omitempty"`
}
// ResponseError is a serializable error type.
// The Type and Parameters may be used to reconstruct the same error on the
// client side, falling back to using the Type and Message if this cannot be
// done.
type ResponseError struct {
Type string `codec:",omitempty"`
Message string `codec:",omitempty"`
Parameters map[string]interface{} `codec:",omitempty"`
}
// WrapError wraps an error in a serializable struct containing the error's type
// and message.
func WrapError(err error) *ResponseError {
if err == nil {
return nil
}
v := reflect.ValueOf(err)
re := ResponseError{
Type: v.Type().String(),
Message: err.Error(),
}
if v.Kind() == reflect.Struct {
re.Parameters = make(map[string]interface{})
for i := 0; i < v.NumField(); i++ {
field := v.Type().Field(i)
re.Parameters[field.Name] = v.Field(i).Interface()
}
}
return &re
}
// Unwrap returns the underlying error if it can be reconstructed, or the
// original ResponseError otherwise.
func (err *ResponseError) Unwrap() error {
var errVal reflect.Value
var zeroVal reflect.Value
switch err.Type {
case "storagedriver.PathNotFoundError":
errVal = reflect.ValueOf(&storagedriver.PathNotFoundError{})
case "storagedriver.InvalidOffsetError":
errVal = reflect.ValueOf(&storagedriver.InvalidOffsetError{})
}
if errVal == zeroVal {
return err
}
for k, v := range err.Parameters {
fieldVal := errVal.Elem().FieldByName(k)
if fieldVal == zeroVal {
return err
}
fieldVal.Set(reflect.ValueOf(v))
}
if unwrapped, ok := errVal.Elem().Interface().(error); ok {
return unwrapped
}
return err
}
func (err *ResponseError) Error() string {
return fmt.Sprintf("%s: %s", err.Type, err.Message)
}
// IPC method call response object definitions
// VersionResponse is a response for a Version request
type VersionResponse struct {
Version storagedriver.Version `codec:",omitempty"`
Error *ResponseError `codec:",omitempty"`
}
// ReadStreamResponse is a response for a ReadStream request
type ReadStreamResponse struct {
Reader io.ReadCloser `codec:",omitempty"`
Error *ResponseError `codec:",omitempty"`
}
// WriteStreamResponse is a response for a WriteStream request
type WriteStreamResponse struct {
Error *ResponseError `codec:",omitempty"`
}
// CurrentSizeResponse is a response for a CurrentSize request
type CurrentSizeResponse struct {
Position uint64 `codec:",omitempty"`
Error *ResponseError `codec:",omitempty"`
}
// ListResponse is a response for a List request
type ListResponse struct {
Keys []string `codec:",omitempty"`
Error *ResponseError `codec:",omitempty"`
}
// MoveResponse is a response for a Move request
type MoveResponse struct {
Error *ResponseError `codec:",omitempty"`
}
// DeleteResponse is a response for a Delete request
type DeleteResponse struct {
Error *ResponseError `codec:",omitempty"`
}

View File

@ -1,178 +0,0 @@
// +build ignore
package ipc
import (
"bytes"
"io"
"io/ioutil"
"net"
"os"
"reflect"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/libchan"
"github.com/docker/libchan/spdy"
)
// StorageDriverServer runs a new IPC server handling requests for the given
// storagedriver.StorageDriver
// This explicitly uses file descriptor 3 for IPC communication, as storage drivers are spawned in
// client.go
//
// To create a new out-of-process driver, create a main package which calls StorageDriverServer with
// a storagedriver.StorageDriver
func StorageDriverServer(driver storagedriver.StorageDriver) error {
childSocket := os.NewFile(3, "childSocket")
defer childSocket.Close()
conn, err := net.FileConn(childSocket)
if err != nil {
panic(err)
}
defer conn.Close()
if transport, err := spdy.NewServerTransport(conn); err != nil {
panic(err)
} else {
for {
receiver, err := transport.WaitReceiveChannel()
if err == io.EOF {
return nil
} else if err != nil {
panic(err)
}
go receive(driver, receiver)
}
}
}
// receive receives new storagedriver.StorageDriver method requests and creates a new goroutine to
// handle each request
// Requests are expected to be of type ipc.Request as the parameters are unknown until the request
// type is deserialized
func receive(driver storagedriver.StorageDriver, receiver libchan.Receiver) {
for {
var request Request
err := receiver.Receive(&request)
if err == io.EOF {
return
} else if err != nil {
panic(err)
}
go handleRequest(driver, request)
}
}
// handleRequest handles storagedriver.StorageDriver method requests as defined in client.go
// Responds to requests using the Request.ResponseChannel
func handleRequest(driver storagedriver.StorageDriver, request Request) {
switch request.Type {
case "Version":
err := request.ResponseChannel.Send(&VersionResponse{Version: storagedriver.CurrentVersion})
if err != nil {
panic(err)
}
case "GetContent":
path, _ := request.Parameters["Path"].(string)
content, err := driver.GetContent(path)
var response ReadStreamResponse
if err != nil {
response = ReadStreamResponse{Error: WrapError(err)}
} else {
response = ReadStreamResponse{Reader: ioutil.NopCloser(bytes.NewReader(content))}
}
err = request.ResponseChannel.Send(&response)
if err != nil {
panic(err)
}
case "PutContent":
path, _ := request.Parameters["Path"].(string)
reader, _ := request.Parameters["Reader"].(io.ReadCloser)
contents, err := ioutil.ReadAll(reader)
defer reader.Close()
if err == nil {
err = driver.PutContent(path, contents)
}
response := WriteStreamResponse{
Error: WrapError(err),
}
err = request.ResponseChannel.Send(&response)
if err != nil {
panic(err)
}
case "ReadStream":
path, _ := request.Parameters["Path"].(string)
// Depending on serialization method, Offset may be converted to any int/uint type
offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(int64(0))).Int()
reader, err := driver.ReadStream(path, offset)
var response ReadStreamResponse
if err != nil {
response = ReadStreamResponse{Error: WrapError(err)}
} else {
response = ReadStreamResponse{Reader: reader}
}
err = request.ResponseChannel.Send(&response)
if err != nil {
panic(err)
}
case "WriteStream":
path, _ := request.Parameters["Path"].(string)
// Depending on serialization method, Offset may be converted to any int/uint type
offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(int64(0))).Int()
// Depending on serialization method, Size may be converted to any int/uint type
size := reflect.ValueOf(request.Parameters["Size"]).Convert(reflect.TypeOf(int64(0))).Int()
reader, _ := request.Parameters["Reader"].(io.ReadCloser)
err := driver.WriteStream(path, offset, size, reader)
response := WriteStreamResponse{
Error: WrapError(err),
}
err = request.ResponseChannel.Send(&response)
if err != nil {
panic(err)
}
case "CurrentSize":
path, _ := request.Parameters["Path"].(string)
position, err := driver.CurrentSize(path)
response := CurrentSizeResponse{
Position: position,
Error: WrapError(err),
}
err = request.ResponseChannel.Send(&response)
if err != nil {
panic(err)
}
case "List":
path, _ := request.Parameters["Path"].(string)
keys, err := driver.List(path)
response := ListResponse{
Keys: keys,
Error: WrapError(err),
}
err = request.ResponseChannel.Send(&response)
if err != nil {
panic(err)
}
case "Move":
sourcePath, _ := request.Parameters["SourcePath"].(string)
destPath, _ := request.Parameters["DestPath"].(string)
err := driver.Move(sourcePath, destPath)
response := MoveResponse{
Error: WrapError(err),
}
err = request.ResponseChannel.Send(&response)
if err != nil {
panic(err)
}
case "Delete":
path, _ := request.Parameters["Path"].(string)
err := driver.Delete(path)
response := DeleteResponse{
Error: WrapError(err),
}
err = request.ResponseChannel.Send(&response)
if err != nil {
panic(err)
}
default:
panic(request)
}
}

View File

@ -36,5 +36,5 @@ func init() {
return ""
}
testsuites.RegisterInProcessSuite(driverConstructor, skipCheck)
testsuites.RegisterSuite(driverConstructor, skipCheck)
}

View File

@ -17,7 +17,8 @@ import (
// Hook up gocheck into the "go test" runner.
func Test(t *testing.T) { check.TestingT(t) }
type S3DriverConstructor func(rootDirectory string) (*Driver, error)
var s3DriverConstructor func(rootDirectory string) (*Driver, error)
var skipS3 func() string
func init() {
accessKey := os.Getenv("AWS_ACCESS_KEY")
@ -33,7 +34,7 @@ func init() {
}
defer os.Remove(root)
s3DriverConstructor := func(rootDirectory string) (*Driver, error) {
s3DriverConstructor = func(rootDirectory string) (*Driver, error) {
encryptBool := false
if encrypt != "" {
encryptBool, err = strconv.ParseBool(encrypt)
@ -74,79 +75,64 @@ func init() {
}
// Skip S3 storage driver tests if environment variable parameters are not provided
skipCheck := func() string {
skipS3 = func() string {
if accessKey == "" || secretKey == "" || region == "" || bucket == "" || encrypt == "" {
return "Must set AWS_ACCESS_KEY, AWS_SECRET_KEY, AWS_REGION, S3_BUCKET, and S3_ENCRYPT to run S3 tests"
}
return ""
}
driverConstructor := func() (storagedriver.StorageDriver, error) {
testsuites.RegisterSuite(func() (storagedriver.StorageDriver, error) {
return s3DriverConstructor(root)
}, skipS3)
}
func TestEmptyRootList(t *testing.T) {
if skipS3() != "" {
t.Skip(skipS3())
}
testsuites.RegisterInProcessSuite(driverConstructor, skipCheck)
// s3Constructor := func() (*Driver, error) {
// return s3DriverConstructor(aws.GetRegion(region))
// }
RegisterS3DriverSuite(s3DriverConstructor, skipCheck)
// testsuites.RegisterIPCSuite(driverName, map[string]string{
// "accesskey": accessKey,
// "secretkey": secretKey,
// "region": region.Name,
// "bucket": bucket,
// "encrypt": encrypt,
// }, skipCheck)
// }
}
func RegisterS3DriverSuite(s3DriverConstructor S3DriverConstructor, skipCheck testsuites.SkipCheck) {
check.Suite(&S3DriverSuite{
Constructor: s3DriverConstructor,
SkipCheck: skipCheck,
})
}
type S3DriverSuite struct {
Constructor S3DriverConstructor
testsuites.SkipCheck
}
func (suite *S3DriverSuite) SetUpSuite(c *check.C) {
if reason := suite.SkipCheck(); reason != "" {
c.Skip(reason)
}
}
func (suite *S3DriverSuite) TestEmptyRootList(c *check.C) {
validRoot, err := ioutil.TempDir("", "driver-")
c.Assert(err, check.IsNil)
if err != nil {
t.Fatalf("unexpected error creating temporary directory: %v", err)
}
defer os.Remove(validRoot)
rootedDriver, err := suite.Constructor(validRoot)
c.Assert(err, check.IsNil)
emptyRootDriver, err := suite.Constructor("")
c.Assert(err, check.IsNil)
slashRootDriver, err := suite.Constructor("/")
c.Assert(err, check.IsNil)
rootedDriver, err := s3DriverConstructor(validRoot)
if err != nil {
t.Fatalf("unexpected error creating rooted driver: %v", err)
}
emptyRootDriver, err := s3DriverConstructor("")
if err != nil {
t.Fatalf("unexpected error creating empty root driver: %v", err)
}
slashRootDriver, err := s3DriverConstructor("/")
if err != nil {
t.Fatalf("unexpected error creating slash root driver: %v", err)
}
filename := "/test"
contents := []byte("contents")
ctx := context.Background()
err = rootedDriver.PutContent(ctx, filename, contents)
c.Assert(err, check.IsNil)
if err != nil {
t.Fatalf("unexpected error creating content: %v", err)
}
defer rootedDriver.Delete(ctx, filename)
keys, err := emptyRootDriver.List(ctx, "/")
for _, path := range keys {
c.Assert(storagedriver.PathRegexp.MatchString(path), check.Equals, true)
if !storagedriver.PathRegexp.MatchString(path) {
t.Fatalf("unexpected string in path: %q != %q", path, storagedriver.PathRegexp)
}
}
keys, err = slashRootDriver.List(ctx, "/")
for _, path := range keys {
c.Assert(storagedriver.PathRegexp.MatchString(path), check.Equals, true)
if !storagedriver.PathRegexp.MatchString(path) {
t.Fatalf("unexpected string in path: %q != %q", path, storagedriver.PathRegexp)
}
}
}

View File

@ -22,9 +22,9 @@ import (
// Test hooks up gocheck into the "go test" runner.
func Test(t *testing.T) { check.TestingT(t) }
// RegisterInProcessSuite registers an in-process storage driver test suite with
// RegisterSuite registers an in-process storage driver test suite with
// the go test runner.
func RegisterInProcessSuite(driverConstructor DriverConstructor, skipCheck SkipCheck) {
func RegisterSuite(driverConstructor DriverConstructor, skipCheck SkipCheck) {
check.Suite(&DriverSuite{
Constructor: driverConstructor,
SkipCheck: skipCheck,
@ -32,39 +32,6 @@ func RegisterInProcessSuite(driverConstructor DriverConstructor, skipCheck SkipC
})
}
// RegisterIPCSuite registers a storage driver test suite which runs the named
// driver as a child process with the given parameters.
func RegisterIPCSuite(driverName string, ipcParams map[string]string, skipCheck SkipCheck) {
panic("ipc testing is disabled for now")
// NOTE(stevvooe): IPC testing is disabled for now. Uncomment the code
// block before and remove the panic when we phase it back in.
// suite := &DriverSuite{
// Constructor: func() (storagedriver.StorageDriver, error) {
// d, err := ipc.NewDriverClient(driverName, ipcParams)
// if err != nil {
// return nil, err
// }
// err = d.Start()
// if err != nil {
// return nil, err
// }
// return d, nil
// },
// SkipCheck: skipCheck,
// }
// suite.Teardown = func() error {
// if suite.StorageDriver == nil {
// return nil
// }
// driverClient := suite.StorageDriver.(*ipc.StorageDriverClient)
// return driverClient.Stop()
// }
// check.Suite(suite)
}
// SkipCheck is a function used to determine if a test suite should be skipped.
// If a SkipCheck returns a non-empty skip reason, the suite is skipped with
// the given reason.
@ -82,9 +49,8 @@ type DriverConstructor func() (storagedriver.StorageDriver, error)
type DriverTeardown func() error
// DriverSuite is a gocheck test suite designed to test a
// storagedriver.StorageDriver.
// The intended way to create a DriverSuite is with RegisterInProcessSuite or
// RegisterIPCSuite.
// storagedriver.StorageDriver. The intended way to create a DriverSuite is
// with RegisterSuite.
type DriverSuite struct {
Constructor DriverConstructor
Teardown DriverTeardown
@ -841,10 +807,6 @@ func (suite *DriverSuite) TestConcurrentStreamReads(c *check.C) {
// TestConcurrentFileStreams checks that multiple *os.File objects can be passed
// in to WriteStream concurrently without hanging.
func (suite *DriverSuite) TestConcurrentFileStreams(c *check.C) {
// if _, isIPC := suite.StorageDriver.(*ipc.StorageDriverClient); isIPC {
// c.Skip("Need to fix out-of-process concurrency")
// }
numStreams := 32
if testing.Short() {