Merge pull request #670 from stevvooe/remove-ipc
Remove half-baked Storage Driver IPC support
This commit is contained in:
commit
c56e28826e
15 changed files with 63 additions and 1031 deletions
|
@ -1,31 +0,0 @@
|
||||||
// +build ignore
|
|
||||||
|
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"os"
|
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
|
||||||
"github.com/docker/distribution/registry/storage/driver/azure"
|
|
||||||
"github.com/docker/distribution/registry/storage/driver/ipc"
|
|
||||||
)
|
|
||||||
|
|
||||||
// An out-of-process Azure Storage driver, intended to be run by ipc.NewDriverClient
|
|
||||||
func main() {
|
|
||||||
parametersBytes := []byte(os.Args[1])
|
|
||||||
var parameters map[string]interface{}
|
|
||||||
err := json.Unmarshal(parametersBytes, ¶meters)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
driver, err := azure.FromParameters(parameters)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := ipc.StorageDriverServer(driver); err != nil {
|
|
||||||
log.Fatalln("driver error:", err)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,27 +0,0 @@
|
||||||
// +build ignore
|
|
||||||
|
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"os"
|
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
|
||||||
|
|
||||||
"github.com/docker/distribution/registry/storage/driver/filesystem"
|
|
||||||
"github.com/docker/distribution/registry/storage/driver/ipc"
|
|
||||||
)
|
|
||||||
|
|
||||||
// An out-of-process filesystem driver, intended to be run by ipc.NewDriverClient
|
|
||||||
func main() {
|
|
||||||
parametersBytes := []byte(os.Args[1])
|
|
||||||
var parameters map[string]string
|
|
||||||
err := json.Unmarshal(parametersBytes, ¶meters)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := ipc.StorageDriverServer(filesystem.FromParameters(parameters)); err != nil {
|
|
||||||
logrus.Fatalln(err)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,17 +0,0 @@
|
||||||
// +build ignore
|
|
||||||
|
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/Sirupsen/logrus"
|
|
||||||
"github.com/docker/distribution/registry/storage/driver/inmemory"
|
|
||||||
"github.com/docker/distribution/registry/storage/driver/ipc"
|
|
||||||
)
|
|
||||||
|
|
||||||
// An out-of-process inmemory driver, intended to be run by ipc.NewDriverClient
|
|
||||||
// This exists primarily for example and testing purposes
|
|
||||||
func main() {
|
|
||||||
if err := ipc.StorageDriverServer(inmemory.New()); err != nil {
|
|
||||||
logrus.Fatalln(err)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,32 +0,0 @@
|
||||||
// +build ignore
|
|
||||||
|
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"os"
|
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
|
||||||
|
|
||||||
"github.com/docker/distribution/registry/storage/driver/ipc"
|
|
||||||
"github.com/docker/distribution/registry/storage/driver/s3"
|
|
||||||
)
|
|
||||||
|
|
||||||
// An out-of-process S3 driver, intended to be run by ipc.NewDriverClient
|
|
||||||
func main() {
|
|
||||||
parametersBytes := []byte(os.Args[1])
|
|
||||||
var parameters map[string]string
|
|
||||||
err := json.Unmarshal(parametersBytes, ¶meters)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
driver, err := s3.FromParameters(parameters)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := ipc.StorageDriverServer(driver); err != nil {
|
|
||||||
logrus.Fatalln(err)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -30,29 +30,30 @@ The storage driver API is designed to model a filesystem-like key/value storage
|
||||||
|
|
||||||
Storage drivers are required to implement the `storagedriver.StorageDriver` interface provided in `storagedriver.go`, which includes methods for reading, writing, and deleting content, as well as listing child objects of a specified prefix key.
|
Storage drivers are required to implement the `storagedriver.StorageDriver` interface provided in `storagedriver.go`, which includes methods for reading, writing, and deleting content, as well as listing child objects of a specified prefix key.
|
||||||
|
|
||||||
Storage drivers are intended (but not required) to be written in go, providing compile-time validation of the `storagedriver.StorageDriver` interface, although an IPC driver wrapper means that it is not required for drivers to be included in the compiled registry. The `storagedriver/ipc` package provides a client/server protocol for running storage drivers provided in external executables as a managed child server process.
|
Storage drivers are intended to be written in Go, providing compile-time
|
||||||
|
validation of the `storagedriver.StorageDriver` interface.
|
||||||
|
|
||||||
## Driver Selection and Configuration
|
## Driver Selection and Configuration
|
||||||
|
|
||||||
The preferred method of selecting a storage driver is using the `StorageDriverFactory` interface in the `storagedriver/factory` package. These factories provide a common interface for constructing storage drivers with a parameters map. The factory model is based off of the [Register](http://golang.org/pkg/database/sql/#Register) and [Open](http://golang.org/pkg/database/sql/#Open) methods in the builtin [database/sql](http://golang.org/pkg/database/sql) package.
|
The preferred method of selecting a storage driver is using the `StorageDriverFactory` interface in the `storagedriver/factory` package. These factories provide a common interface for constructing storage drivers with a parameters map. The factory model is based off of the [Register](http://golang.org/pkg/database/sql/#Register) and [Open](http://golang.org/pkg/database/sql/#Open) methods in the builtin [database/sql](http://golang.org/pkg/database/sql) package.
|
||||||
|
|
||||||
Storage driver factories may be registered by name using the `factory.Register` method, and then later invoked by calling `factory.Create` with a driver name and parameters map. If no driver is registered with the given name, this factory will attempt to find an executable storage driver with the executable name "registry-storage-\<driver name\>" and return an IPC storage driver wrapper managing the driver subprocess. If no such storage driver can be found, `factory.Create` will return an `InvalidStorageDriverError`.
|
Storage driver factories may be registered by name using the
|
||||||
|
`factory.Register` method, and then later invoked by calling `factory.Create`
|
||||||
|
with a driver name and parameters map. If no such storage driver can be found,
|
||||||
|
`factory.Create` will return an `InvalidStorageDriverError`.
|
||||||
|
|
||||||
## Driver Contribution
|
## Driver Contribution
|
||||||
|
|
||||||
### Writing new storage drivers
|
### Writing new storage drivers
|
||||||
To create a valid storage driver, one must implement the `storagedriver.StorageDriver` interface and make sure to expose this driver via the factory system and as a distributable IPC server executable.
|
To create a valid storage driver, one must implement the
|
||||||
|
`storagedriver.StorageDriver` interface and make sure to expose this driver
|
||||||
|
via the factory system.
|
||||||
|
|
||||||
#### In-process drivers
|
#### Registering
|
||||||
Storage drivers should call `factory.Register` with their driver name in an `init` method, allowing callers of `factory.New` to construct instances of this driver without requiring modification of imports throughout the codebase.
|
Storage drivers should call `factory.Register` with their driver name in an `init` method, allowing callers of `factory.New` to construct instances of this driver without requiring modification of imports throughout the codebase.
|
||||||
|
|
||||||
#### Out-of-process drivers
|
|
||||||
As many users will run the registry as a pre-constructed docker container, storage drivers should also be distributable as IPC server executables. Drivers written in go should model the main method provided in `registry/storage/driverfilesystem/driver.go`. Parameters to IPC drivers will be provided as a JSON-serialized map in the first argument to the process. These parameters should be validated and then a blocking call to `ipc.StorageDriverServer` should be made with a new storage driver.
|
|
||||||
|
|
||||||
Out-of-process drivers must also implement the `ipc.IPCStorageDriver` interface, which exposes a `Version` check for the storage driver. This is used to validate storage driver api compatibility at driver load-time.
|
|
||||||
|
|
||||||
## Testing
|
## Testing
|
||||||
Storage driver test suites are provided in `storagedriver/testsuites/testsuites.go` and may be used for any storage driver written in go. Two methods are provided for registering test suites, `RegisterInProcessSuite` and `RegisterIPCSuite`, which run the same set of tests for the driver imported or managed over IPC respectively.
|
Storage driver test suites are provided in
|
||||||
|
`storagedriver/testsuites/testsuites.go` and may be used for any storage
|
||||||
## Drivers written in other languages
|
driver written in Go. Tests can be registered using the `RegisterSuite`
|
||||||
Although storage drivers are strongly recommended to be written in go for consistency, compile-time validation, and support, the IPC framework allows for a level of language-agnosticism. Non-go drivers must implement the storage driver protocol by mimicing StorageDriverServer in `storagedriver/ipc/server.go`. As the IPC framework is a layer on top of [docker/libchan](https://github.com/docker/libchan), this currently limits language support to Java via [ndeloof/chan](https://github.com/ndeloof/jchan) and Javascript via [GraftJS/jschan](https://github.com/GraftJS/jschan), although contributions to the libchan project are welcome.
|
function, which run the same set of tests for any registered drivers.
|
||||||
|
|
|
@ -59,11 +59,5 @@ func init() {
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
testsuites.RegisterInProcessSuite(azureDriverConstructor, skipCheck)
|
testsuites.RegisterSuite(azureDriverConstructor, skipCheck)
|
||||||
// testsuites.RegisterIPCSuite(driverName, map[string]string{
|
|
||||||
// paramAccountName: accountName,
|
|
||||||
// paramAccountKey: accountKey,
|
|
||||||
// paramContainer: container,
|
|
||||||
// paramRealm: realm,
|
|
||||||
// }, skipCheck)
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,30 +33,14 @@ func Register(name string, factory StorageDriverFactory) {
|
||||||
driverFactories[name] = factory
|
driverFactories[name] = factory
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a new storagedriver.StorageDriver with the given name and parameters
|
// Create a new storagedriver.StorageDriver with the given name and
|
||||||
// To run in-process, the StorageDriverFactory must first be registered with the given name
|
// parameters. To use a driver, the StorageDriverFactory must first be
|
||||||
// If no in-process drivers are found with the given name, this attempts to create an IPC driver
|
// registered with the given name. If no drivers are found, an
|
||||||
// If no in-process or external drivers are found, an InvalidStorageDriverError is returned
|
// InvalidStorageDriverError is returned
|
||||||
func Create(name string, parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
|
func Create(name string, parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
|
||||||
driverFactory, ok := driverFactories[name]
|
driverFactory, ok := driverFactories[name]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, InvalidStorageDriverError{name}
|
return nil, InvalidStorageDriverError{name}
|
||||||
|
|
||||||
// NOTE(stevvooe): We are disabling storagedriver ipc for now, as the
|
|
||||||
// server and client need to be updated for the changed API calls and
|
|
||||||
// there were some problems libchan hanging. We'll phase this
|
|
||||||
// functionality back in over the next few weeks.
|
|
||||||
|
|
||||||
// No registered StorageDriverFactory found, try ipc
|
|
||||||
// driverClient, err := ipc.NewDriverClient(name, parameters)
|
|
||||||
// if err != nil {
|
|
||||||
// return nil, InvalidStorageDriverError{name}
|
|
||||||
// }
|
|
||||||
// err = driverClient.Start()
|
|
||||||
// if err != nil {
|
|
||||||
// return nil, err
|
|
||||||
// }
|
|
||||||
// return driverClient, nil
|
|
||||||
}
|
}
|
||||||
return driverFactory.Create(parameters)
|
return driverFactory.Create(parameters)
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,10 +20,7 @@ func init() {
|
||||||
}
|
}
|
||||||
defer os.Remove(root)
|
defer os.Remove(root)
|
||||||
|
|
||||||
testsuites.RegisterInProcessSuite(func() (storagedriver.StorageDriver, error) {
|
testsuites.RegisterSuite(func() (storagedriver.StorageDriver, error) {
|
||||||
return New(root), nil
|
return New(root), nil
|
||||||
}, testsuites.NeverSkip)
|
}, testsuites.NeverSkip)
|
||||||
|
|
||||||
// BUG(stevvooe): IPC is broken so we're disabling for now. Will revisit later.
|
|
||||||
// testsuites.RegisterIPCSuite(driverName, map[string]string{"rootdirectory": root}, testsuites.NeverSkip)
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,6 @@ import (
|
||||||
|
|
||||||
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
||||||
"github.com/docker/distribution/registry/storage/driver/testsuites"
|
"github.com/docker/distribution/registry/storage/driver/testsuites"
|
||||||
|
|
||||||
"gopkg.in/check.v1"
|
"gopkg.in/check.v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -16,9 +15,5 @@ func init() {
|
||||||
inmemoryDriverConstructor := func() (storagedriver.StorageDriver, error) {
|
inmemoryDriverConstructor := func() (storagedriver.StorageDriver, error) {
|
||||||
return New(), nil
|
return New(), nil
|
||||||
}
|
}
|
||||||
testsuites.RegisterInProcessSuite(inmemoryDriverConstructor, testsuites.NeverSkip)
|
testsuites.RegisterSuite(inmemoryDriverConstructor, testsuites.NeverSkip)
|
||||||
|
|
||||||
// BUG(stevvooe): Disable flaky IPC tests for now when we can troubleshoot
|
|
||||||
// the problems with libchan.
|
|
||||||
// testsuites.RegisterIPCSuite(driverName, nil, testsuites.NeverSkip)
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,454 +0,0 @@
|
||||||
// +build ignore
|
|
||||||
|
|
||||||
package ipc
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"io/ioutil"
|
|
||||||
"net"
|
|
||||||
"os"
|
|
||||||
"os/exec"
|
|
||||||
"syscall"
|
|
||||||
|
|
||||||
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
|
||||||
"github.com/docker/libchan"
|
|
||||||
"github.com/docker/libchan/spdy"
|
|
||||||
)
|
|
||||||
|
|
||||||
// StorageDriverExecutablePrefix is the prefix which the IPC storage driver
|
|
||||||
// loader expects driver executables to begin with. For example, the s3 driver
|
|
||||||
// should be named "registry-storagedriver-s3".
|
|
||||||
const StorageDriverExecutablePrefix = "registry-storagedriver-"
|
|
||||||
|
|
||||||
// StorageDriverClient is a storagedriver.StorageDriver implementation using a
|
|
||||||
// managed child process communicating over IPC using libchan with a unix domain
|
|
||||||
// socket
|
|
||||||
type StorageDriverClient struct {
|
|
||||||
subprocess *exec.Cmd
|
|
||||||
exitChan chan error
|
|
||||||
exitErr error
|
|
||||||
stopChan chan struct{}
|
|
||||||
socket *os.File
|
|
||||||
transport *spdy.Transport
|
|
||||||
sender libchan.Sender
|
|
||||||
version storagedriver.Version
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewDriverClient constructs a new out-of-process storage driver using the
|
|
||||||
// driver name and configuration parameters
|
|
||||||
// A user must call Start on this driver client before remote method calls can
|
|
||||||
// be made
|
|
||||||
//
|
|
||||||
// Looks for drivers in the following locations in order:
|
|
||||||
// - Storage drivers directory (to be determined, yet not implemented)
|
|
||||||
// - $GOPATH/bin
|
|
||||||
// - $PATH
|
|
||||||
func NewDriverClient(name string, parameters map[string]string) (*StorageDriverClient, error) {
|
|
||||||
paramsBytes, err := json.Marshal(parameters)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
driverExecName := StorageDriverExecutablePrefix + name
|
|
||||||
driverPath, err := exec.LookPath(driverExecName)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
command := exec.Command(driverPath, string(paramsBytes))
|
|
||||||
|
|
||||||
return &StorageDriverClient{
|
|
||||||
subprocess: command,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start starts the designated child process storage driver and binds a socket
|
|
||||||
// to this process for IPC method calls
|
|
||||||
func (driver *StorageDriverClient) Start() error {
|
|
||||||
driver.exitErr = nil
|
|
||||||
driver.exitChan = make(chan error)
|
|
||||||
driver.stopChan = make(chan struct{})
|
|
||||||
|
|
||||||
fileDescriptors, err := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM, 0)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
childSocket := os.NewFile(uintptr(fileDescriptors[0]), "childSocket")
|
|
||||||
driver.socket = os.NewFile(uintptr(fileDescriptors[1]), "parentSocket")
|
|
||||||
|
|
||||||
driver.subprocess.Stdout = os.Stdout
|
|
||||||
driver.subprocess.Stderr = os.Stderr
|
|
||||||
driver.subprocess.ExtraFiles = []*os.File{childSocket}
|
|
||||||
|
|
||||||
if err = driver.subprocess.Start(); err != nil {
|
|
||||||
driver.Stop()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
go driver.handleSubprocessExit()
|
|
||||||
|
|
||||||
if err = childSocket.Close(); err != nil {
|
|
||||||
driver.Stop()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
connection, err := net.FileConn(driver.socket)
|
|
||||||
if err != nil {
|
|
||||||
driver.Stop()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
driver.transport, err = spdy.NewClientTransport(connection)
|
|
||||||
if err != nil {
|
|
||||||
driver.Stop()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
driver.sender, err = driver.transport.NewSendChannel()
|
|
||||||
if err != nil {
|
|
||||||
driver.Stop()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check the driver's version to determine compatibility
|
|
||||||
receiver, remoteSender := libchan.Pipe()
|
|
||||||
err = driver.sender.Send(&Request{Type: "Version", ResponseChannel: remoteSender})
|
|
||||||
if err != nil {
|
|
||||||
driver.Stop()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
var response VersionResponse
|
|
||||||
err = receiver.Receive(&response)
|
|
||||||
if err != nil {
|
|
||||||
driver.Stop()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if response.Error != nil {
|
|
||||||
return response.Error.Unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
driver.version = response.Version
|
|
||||||
|
|
||||||
if driver.version.Major() != storagedriver.CurrentVersion.Major() || driver.version.Minor() > storagedriver.CurrentVersion.Minor() {
|
|
||||||
return IncompatibleVersionError{driver.version}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stop stops the child process storage driver
|
|
||||||
// storagedriver.StorageDriver methods called after Stop will fail
|
|
||||||
func (driver *StorageDriverClient) Stop() error {
|
|
||||||
var closeSenderErr, closeTransportErr, closeSocketErr, killErr error
|
|
||||||
|
|
||||||
if driver.sender != nil {
|
|
||||||
closeSenderErr = driver.sender.Close()
|
|
||||||
}
|
|
||||||
if driver.transport != nil {
|
|
||||||
closeTransportErr = driver.transport.Close()
|
|
||||||
}
|
|
||||||
if driver.socket != nil {
|
|
||||||
closeSocketErr = driver.socket.Close()
|
|
||||||
}
|
|
||||||
if driver.subprocess != nil {
|
|
||||||
killErr = driver.subprocess.Process.Kill()
|
|
||||||
}
|
|
||||||
if driver.stopChan != nil {
|
|
||||||
close(driver.stopChan)
|
|
||||||
}
|
|
||||||
|
|
||||||
if closeSenderErr != nil {
|
|
||||||
return closeSenderErr
|
|
||||||
} else if closeTransportErr != nil {
|
|
||||||
return closeTransportErr
|
|
||||||
} else if closeSocketErr != nil {
|
|
||||||
return closeSocketErr
|
|
||||||
}
|
|
||||||
|
|
||||||
return killErr
|
|
||||||
}
|
|
||||||
|
|
||||||
// Implement the storagedriver.StorageDriver interface over IPC
|
|
||||||
|
|
||||||
// GetContent retrieves the content stored at "path" as a []byte.
|
|
||||||
func (driver *StorageDriverClient) GetContent(path string) ([]byte, error) {
|
|
||||||
if err := driver.exited(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
receiver, remoteSender := libchan.Pipe()
|
|
||||||
|
|
||||||
params := map[string]interface{}{"Path": path}
|
|
||||||
err := driver.sender.Send(&Request{Type: "GetContent", Parameters: params, ResponseChannel: remoteSender})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
response := new(ReadStreamResponse)
|
|
||||||
err = driver.receiveResponse(receiver, response)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if response.Error != nil {
|
|
||||||
return nil, response.Error.Unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
defer response.Reader.Close()
|
|
||||||
contents, err := ioutil.ReadAll(response.Reader)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return contents, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// PutContent stores the []byte content at a location designated by "path".
|
|
||||||
func (driver *StorageDriverClient) PutContent(path string, contents []byte) error {
|
|
||||||
if err := driver.exited(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
receiver, remoteSender := libchan.Pipe()
|
|
||||||
|
|
||||||
params := map[string]interface{}{"Path": path, "Reader": ioutil.NopCloser(bytes.NewReader(contents))}
|
|
||||||
err := driver.sender.Send(&Request{Type: "PutContent", Parameters: params, ResponseChannel: remoteSender})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
response := new(WriteStreamResponse)
|
|
||||||
err = driver.receiveResponse(receiver, response)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if response.Error != nil {
|
|
||||||
return response.Error.Unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
|
|
||||||
// given byte offset.
|
|
||||||
func (driver *StorageDriverClient) ReadStream(path string, offset int64) (io.ReadCloser, error) {
|
|
||||||
if err := driver.exited(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
receiver, remoteSender := libchan.Pipe()
|
|
||||||
params := map[string]interface{}{"Path": path, "Offset": offset}
|
|
||||||
err := driver.sender.Send(&Request{Type: "ReadStream", Parameters: params, ResponseChannel: remoteSender})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
response := new(ReadStreamResponse)
|
|
||||||
err = driver.receiveResponse(receiver, response)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if response.Error != nil {
|
|
||||||
return nil, response.Error.Unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
return response.Reader, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// WriteStream stores the contents of the provided io.ReadCloser at a location
|
|
||||||
// designated by the given path.
|
|
||||||
func (driver *StorageDriverClient) WriteStream(path string, offset, size int64, reader io.ReadCloser) error {
|
|
||||||
if err := driver.exited(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
receiver, remoteSender := libchan.Pipe()
|
|
||||||
params := map[string]interface{}{"Path": path, "Offset": offset, "Size": size, "Reader": reader}
|
|
||||||
err := driver.sender.Send(&Request{Type: "WriteStream", Parameters: params, ResponseChannel: remoteSender})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
response := new(WriteStreamResponse)
|
|
||||||
err = driver.receiveResponse(receiver, response)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if response.Error != nil {
|
|
||||||
return response.Error.Unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// CurrentSize retrieves the curernt size in bytes of the object at the given
|
|
||||||
// path.
|
|
||||||
func (driver *StorageDriverClient) CurrentSize(path string) (uint64, error) {
|
|
||||||
if err := driver.exited(); err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
receiver, remoteSender := libchan.Pipe()
|
|
||||||
params := map[string]interface{}{"Path": path}
|
|
||||||
err := driver.sender.Send(&Request{Type: "CurrentSize", Parameters: params, ResponseChannel: remoteSender})
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
response := new(CurrentSizeResponse)
|
|
||||||
err = driver.receiveResponse(receiver, response)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if response.Error != nil {
|
|
||||||
return 0, response.Error.Unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
return response.Position, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// List returns a list of the objects that are direct descendants of the given
|
|
||||||
// path.
|
|
||||||
func (driver *StorageDriverClient) List(path string) ([]string, error) {
|
|
||||||
if err := driver.exited(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
receiver, remoteSender := libchan.Pipe()
|
|
||||||
params := map[string]interface{}{"Path": path}
|
|
||||||
err := driver.sender.Send(&Request{Type: "List", Parameters: params, ResponseChannel: remoteSender})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
response := new(ListResponse)
|
|
||||||
err = driver.receiveResponse(receiver, response)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if response.Error != nil {
|
|
||||||
return nil, response.Error.Unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
return response.Keys, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Move moves an object stored at sourcePath to destPath, removing the original
|
|
||||||
// object.
|
|
||||||
func (driver *StorageDriverClient) Move(sourcePath string, destPath string) error {
|
|
||||||
if err := driver.exited(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
receiver, remoteSender := libchan.Pipe()
|
|
||||||
params := map[string]interface{}{"SourcePath": sourcePath, "DestPath": destPath}
|
|
||||||
err := driver.sender.Send(&Request{Type: "Move", Parameters: params, ResponseChannel: remoteSender})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
response := new(MoveResponse)
|
|
||||||
err = driver.receiveResponse(receiver, response)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if response.Error != nil {
|
|
||||||
return response.Error.Unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete recursively deletes all objects stored at "path" and its subpaths.
|
|
||||||
func (driver *StorageDriverClient) Delete(path string) error {
|
|
||||||
if err := driver.exited(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
receiver, remoteSender := libchan.Pipe()
|
|
||||||
params := map[string]interface{}{"Path": path}
|
|
||||||
err := driver.sender.Send(&Request{Type: "Delete", Parameters: params, ResponseChannel: remoteSender})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
response := new(DeleteResponse)
|
|
||||||
err = driver.receiveResponse(receiver, response)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if response.Error != nil {
|
|
||||||
return response.Error.Unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// handleSubprocessExit populates the exit channel until we have explicitly
|
|
||||||
// stopped the storage driver subprocess
|
|
||||||
// Requests can select on driver.exitChan and response receiving and not hang if
|
|
||||||
// the process exits
|
|
||||||
func (driver *StorageDriverClient) handleSubprocessExit() {
|
|
||||||
exitErr := driver.subprocess.Wait()
|
|
||||||
if exitErr == nil {
|
|
||||||
exitErr = fmt.Errorf("Storage driver subprocess already exited cleanly")
|
|
||||||
} else {
|
|
||||||
exitErr = fmt.Errorf("Storage driver subprocess exited with error: %s", exitErr)
|
|
||||||
}
|
|
||||||
|
|
||||||
driver.exitErr = exitErr
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case driver.exitChan <- exitErr:
|
|
||||||
case <-driver.stopChan:
|
|
||||||
close(driver.exitChan)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// receiveResponse populates the response value with the next result from the
|
|
||||||
// given receiver, or returns an error if receiving failed or the driver has
|
|
||||||
// stopped
|
|
||||||
func (driver *StorageDriverClient) receiveResponse(receiver libchan.Receiver, response interface{}) error {
|
|
||||||
receiveChan := make(chan error, 1)
|
|
||||||
go func(receiver libchan.Receiver, receiveChan chan<- error) {
|
|
||||||
receiveChan <- receiver.Receive(response)
|
|
||||||
}(receiver, receiveChan)
|
|
||||||
|
|
||||||
var err error
|
|
||||||
var ok bool
|
|
||||||
select {
|
|
||||||
case err = <-receiveChan:
|
|
||||||
case err, ok = <-driver.exitChan:
|
|
||||||
if !ok {
|
|
||||||
err = driver.exitErr
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// exited returns an exit error if the driver has exited or nil otherwise
|
|
||||||
func (driver *StorageDriverClient) exited() error {
|
|
||||||
select {
|
|
||||||
case err, ok := <-driver.exitChan:
|
|
||||||
if !ok {
|
|
||||||
return driver.exitErr
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
default:
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,148 +0,0 @@
|
||||||
// +build ignore
|
|
||||||
|
|
||||||
package ipc
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"reflect"
|
|
||||||
|
|
||||||
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
|
||||||
"github.com/docker/libchan"
|
|
||||||
)
|
|
||||||
|
|
||||||
// StorageDriver is the interface which IPC storage drivers must implement. As external storage
|
|
||||||
// drivers may be defined to use a different version of the storagedriver.StorageDriver interface,
|
|
||||||
// we use an additional version check to determine compatiblity.
|
|
||||||
type StorageDriver interface {
|
|
||||||
// Version returns the storagedriver.StorageDriver interface version which this storage driver
|
|
||||||
// implements, which is used to determine driver compatibility
|
|
||||||
Version() (storagedriver.Version, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// IncompatibleVersionError is returned when a storage driver is using an incompatible version of
|
|
||||||
// the storagedriver.StorageDriver api
|
|
||||||
type IncompatibleVersionError struct {
|
|
||||||
version storagedriver.Version
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e IncompatibleVersionError) Error() string {
|
|
||||||
return fmt.Sprintf("Incompatible storage driver version: %s", e.version)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Request defines a remote method call request
|
|
||||||
// A return value struct is to be sent over the ResponseChannel
|
|
||||||
type Request struct {
|
|
||||||
Type string `codec:",omitempty"`
|
|
||||||
Parameters map[string]interface{} `codec:",omitempty"`
|
|
||||||
ResponseChannel libchan.Sender `codec:",omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// ResponseError is a serializable error type.
|
|
||||||
// The Type and Parameters may be used to reconstruct the same error on the
|
|
||||||
// client side, falling back to using the Type and Message if this cannot be
|
|
||||||
// done.
|
|
||||||
type ResponseError struct {
|
|
||||||
Type string `codec:",omitempty"`
|
|
||||||
Message string `codec:",omitempty"`
|
|
||||||
Parameters map[string]interface{} `codec:",omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// WrapError wraps an error in a serializable struct containing the error's type
|
|
||||||
// and message.
|
|
||||||
func WrapError(err error) *ResponseError {
|
|
||||||
if err == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
v := reflect.ValueOf(err)
|
|
||||||
re := ResponseError{
|
|
||||||
Type: v.Type().String(),
|
|
||||||
Message: err.Error(),
|
|
||||||
}
|
|
||||||
|
|
||||||
if v.Kind() == reflect.Struct {
|
|
||||||
re.Parameters = make(map[string]interface{})
|
|
||||||
for i := 0; i < v.NumField(); i++ {
|
|
||||||
field := v.Type().Field(i)
|
|
||||||
re.Parameters[field.Name] = v.Field(i).Interface()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return &re
|
|
||||||
}
|
|
||||||
|
|
||||||
// Unwrap returns the underlying error if it can be reconstructed, or the
|
|
||||||
// original ResponseError otherwise.
|
|
||||||
func (err *ResponseError) Unwrap() error {
|
|
||||||
var errVal reflect.Value
|
|
||||||
var zeroVal reflect.Value
|
|
||||||
|
|
||||||
switch err.Type {
|
|
||||||
case "storagedriver.PathNotFoundError":
|
|
||||||
errVal = reflect.ValueOf(&storagedriver.PathNotFoundError{})
|
|
||||||
case "storagedriver.InvalidOffsetError":
|
|
||||||
errVal = reflect.ValueOf(&storagedriver.InvalidOffsetError{})
|
|
||||||
}
|
|
||||||
if errVal == zeroVal {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for k, v := range err.Parameters {
|
|
||||||
fieldVal := errVal.Elem().FieldByName(k)
|
|
||||||
if fieldVal == zeroVal {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
fieldVal.Set(reflect.ValueOf(v))
|
|
||||||
}
|
|
||||||
|
|
||||||
if unwrapped, ok := errVal.Elem().Interface().(error); ok {
|
|
||||||
return unwrapped
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func (err *ResponseError) Error() string {
|
|
||||||
return fmt.Sprintf("%s: %s", err.Type, err.Message)
|
|
||||||
}
|
|
||||||
|
|
||||||
// IPC method call response object definitions
|
|
||||||
|
|
||||||
// VersionResponse is a response for a Version request
|
|
||||||
type VersionResponse struct {
|
|
||||||
Version storagedriver.Version `codec:",omitempty"`
|
|
||||||
Error *ResponseError `codec:",omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReadStreamResponse is a response for a ReadStream request
|
|
||||||
type ReadStreamResponse struct {
|
|
||||||
Reader io.ReadCloser `codec:",omitempty"`
|
|
||||||
Error *ResponseError `codec:",omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// WriteStreamResponse is a response for a WriteStream request
|
|
||||||
type WriteStreamResponse struct {
|
|
||||||
Error *ResponseError `codec:",omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// CurrentSizeResponse is a response for a CurrentSize request
|
|
||||||
type CurrentSizeResponse struct {
|
|
||||||
Position uint64 `codec:",omitempty"`
|
|
||||||
Error *ResponseError `codec:",omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// ListResponse is a response for a List request
|
|
||||||
type ListResponse struct {
|
|
||||||
Keys []string `codec:",omitempty"`
|
|
||||||
Error *ResponseError `codec:",omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// MoveResponse is a response for a Move request
|
|
||||||
type MoveResponse struct {
|
|
||||||
Error *ResponseError `codec:",omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// DeleteResponse is a response for a Delete request
|
|
||||||
type DeleteResponse struct {
|
|
||||||
Error *ResponseError `codec:",omitempty"`
|
|
||||||
}
|
|
|
@ -1,178 +0,0 @@
|
||||||
// +build ignore
|
|
||||||
|
|
||||||
package ipc
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"io"
|
|
||||||
"io/ioutil"
|
|
||||||
"net"
|
|
||||||
"os"
|
|
||||||
"reflect"
|
|
||||||
|
|
||||||
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
|
||||||
"github.com/docker/libchan"
|
|
||||||
"github.com/docker/libchan/spdy"
|
|
||||||
)
|
|
||||||
|
|
||||||
// StorageDriverServer runs a new IPC server handling requests for the given
|
|
||||||
// storagedriver.StorageDriver
|
|
||||||
// This explicitly uses file descriptor 3 for IPC communication, as storage drivers are spawned in
|
|
||||||
// client.go
|
|
||||||
//
|
|
||||||
// To create a new out-of-process driver, create a main package which calls StorageDriverServer with
|
|
||||||
// a storagedriver.StorageDriver
|
|
||||||
func StorageDriverServer(driver storagedriver.StorageDriver) error {
|
|
||||||
childSocket := os.NewFile(3, "childSocket")
|
|
||||||
defer childSocket.Close()
|
|
||||||
conn, err := net.FileConn(childSocket)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
defer conn.Close()
|
|
||||||
if transport, err := spdy.NewServerTransport(conn); err != nil {
|
|
||||||
panic(err)
|
|
||||||
} else {
|
|
||||||
for {
|
|
||||||
receiver, err := transport.WaitReceiveChannel()
|
|
||||||
if err == io.EOF {
|
|
||||||
return nil
|
|
||||||
} else if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
go receive(driver, receiver)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// receive receives new storagedriver.StorageDriver method requests and creates a new goroutine to
|
|
||||||
// handle each request
|
|
||||||
// Requests are expected to be of type ipc.Request as the parameters are unknown until the request
|
|
||||||
// type is deserialized
|
|
||||||
func receive(driver storagedriver.StorageDriver, receiver libchan.Receiver) {
|
|
||||||
for {
|
|
||||||
var request Request
|
|
||||||
err := receiver.Receive(&request)
|
|
||||||
if err == io.EOF {
|
|
||||||
return
|
|
||||||
} else if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
go handleRequest(driver, request)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// handleRequest handles storagedriver.StorageDriver method requests as defined in client.go
|
|
||||||
// Responds to requests using the Request.ResponseChannel
|
|
||||||
func handleRequest(driver storagedriver.StorageDriver, request Request) {
|
|
||||||
switch request.Type {
|
|
||||||
case "Version":
|
|
||||||
err := request.ResponseChannel.Send(&VersionResponse{Version: storagedriver.CurrentVersion})
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
case "GetContent":
|
|
||||||
path, _ := request.Parameters["Path"].(string)
|
|
||||||
content, err := driver.GetContent(path)
|
|
||||||
var response ReadStreamResponse
|
|
||||||
if err != nil {
|
|
||||||
response = ReadStreamResponse{Error: WrapError(err)}
|
|
||||||
} else {
|
|
||||||
response = ReadStreamResponse{Reader: ioutil.NopCloser(bytes.NewReader(content))}
|
|
||||||
}
|
|
||||||
err = request.ResponseChannel.Send(&response)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
case "PutContent":
|
|
||||||
path, _ := request.Parameters["Path"].(string)
|
|
||||||
reader, _ := request.Parameters["Reader"].(io.ReadCloser)
|
|
||||||
contents, err := ioutil.ReadAll(reader)
|
|
||||||
defer reader.Close()
|
|
||||||
if err == nil {
|
|
||||||
err = driver.PutContent(path, contents)
|
|
||||||
}
|
|
||||||
response := WriteStreamResponse{
|
|
||||||
Error: WrapError(err),
|
|
||||||
}
|
|
||||||
err = request.ResponseChannel.Send(&response)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
case "ReadStream":
|
|
||||||
path, _ := request.Parameters["Path"].(string)
|
|
||||||
// Depending on serialization method, Offset may be converted to any int/uint type
|
|
||||||
offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(int64(0))).Int()
|
|
||||||
reader, err := driver.ReadStream(path, offset)
|
|
||||||
var response ReadStreamResponse
|
|
||||||
if err != nil {
|
|
||||||
response = ReadStreamResponse{Error: WrapError(err)}
|
|
||||||
} else {
|
|
||||||
response = ReadStreamResponse{Reader: reader}
|
|
||||||
}
|
|
||||||
err = request.ResponseChannel.Send(&response)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
case "WriteStream":
|
|
||||||
path, _ := request.Parameters["Path"].(string)
|
|
||||||
// Depending on serialization method, Offset may be converted to any int/uint type
|
|
||||||
offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(int64(0))).Int()
|
|
||||||
// Depending on serialization method, Size may be converted to any int/uint type
|
|
||||||
size := reflect.ValueOf(request.Parameters["Size"]).Convert(reflect.TypeOf(int64(0))).Int()
|
|
||||||
reader, _ := request.Parameters["Reader"].(io.ReadCloser)
|
|
||||||
err := driver.WriteStream(path, offset, size, reader)
|
|
||||||
response := WriteStreamResponse{
|
|
||||||
Error: WrapError(err),
|
|
||||||
}
|
|
||||||
err = request.ResponseChannel.Send(&response)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
case "CurrentSize":
|
|
||||||
path, _ := request.Parameters["Path"].(string)
|
|
||||||
position, err := driver.CurrentSize(path)
|
|
||||||
response := CurrentSizeResponse{
|
|
||||||
Position: position,
|
|
||||||
Error: WrapError(err),
|
|
||||||
}
|
|
||||||
err = request.ResponseChannel.Send(&response)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
case "List":
|
|
||||||
path, _ := request.Parameters["Path"].(string)
|
|
||||||
keys, err := driver.List(path)
|
|
||||||
response := ListResponse{
|
|
||||||
Keys: keys,
|
|
||||||
Error: WrapError(err),
|
|
||||||
}
|
|
||||||
err = request.ResponseChannel.Send(&response)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
case "Move":
|
|
||||||
sourcePath, _ := request.Parameters["SourcePath"].(string)
|
|
||||||
destPath, _ := request.Parameters["DestPath"].(string)
|
|
||||||
err := driver.Move(sourcePath, destPath)
|
|
||||||
response := MoveResponse{
|
|
||||||
Error: WrapError(err),
|
|
||||||
}
|
|
||||||
err = request.ResponseChannel.Send(&response)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
case "Delete":
|
|
||||||
path, _ := request.Parameters["Path"].(string)
|
|
||||||
err := driver.Delete(path)
|
|
||||||
response := DeleteResponse{
|
|
||||||
Error: WrapError(err),
|
|
||||||
}
|
|
||||||
err = request.ResponseChannel.Send(&response)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
panic(request)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -36,5 +36,5 @@ func init() {
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
testsuites.RegisterInProcessSuite(driverConstructor, skipCheck)
|
testsuites.RegisterSuite(driverConstructor, skipCheck)
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,8 @@ import (
|
||||||
// Hook up gocheck into the "go test" runner.
|
// Hook up gocheck into the "go test" runner.
|
||||||
func Test(t *testing.T) { check.TestingT(t) }
|
func Test(t *testing.T) { check.TestingT(t) }
|
||||||
|
|
||||||
type S3DriverConstructor func(rootDirectory string) (*Driver, error)
|
var s3DriverConstructor func(rootDirectory string) (*Driver, error)
|
||||||
|
var skipS3 func() string
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
accessKey := os.Getenv("AWS_ACCESS_KEY")
|
accessKey := os.Getenv("AWS_ACCESS_KEY")
|
||||||
|
@ -33,7 +34,7 @@ func init() {
|
||||||
}
|
}
|
||||||
defer os.Remove(root)
|
defer os.Remove(root)
|
||||||
|
|
||||||
s3DriverConstructor := func(rootDirectory string) (*Driver, error) {
|
s3DriverConstructor = func(rootDirectory string) (*Driver, error) {
|
||||||
encryptBool := false
|
encryptBool := false
|
||||||
if encrypt != "" {
|
if encrypt != "" {
|
||||||
encryptBool, err = strconv.ParseBool(encrypt)
|
encryptBool, err = strconv.ParseBool(encrypt)
|
||||||
|
@ -74,79 +75,64 @@ func init() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Skip S3 storage driver tests if environment variable parameters are not provided
|
// Skip S3 storage driver tests if environment variable parameters are not provided
|
||||||
skipCheck := func() string {
|
skipS3 = func() string {
|
||||||
if accessKey == "" || secretKey == "" || region == "" || bucket == "" || encrypt == "" {
|
if accessKey == "" || secretKey == "" || region == "" || bucket == "" || encrypt == "" {
|
||||||
return "Must set AWS_ACCESS_KEY, AWS_SECRET_KEY, AWS_REGION, S3_BUCKET, and S3_ENCRYPT to run S3 tests"
|
return "Must set AWS_ACCESS_KEY, AWS_SECRET_KEY, AWS_REGION, S3_BUCKET, and S3_ENCRYPT to run S3 tests"
|
||||||
}
|
}
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
driverConstructor := func() (storagedriver.StorageDriver, error) {
|
testsuites.RegisterSuite(func() (storagedriver.StorageDriver, error) {
|
||||||
return s3DriverConstructor(root)
|
return s3DriverConstructor(root)
|
||||||
|
}, skipS3)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEmptyRootList(t *testing.T) {
|
||||||
|
if skipS3() != "" {
|
||||||
|
t.Skip(skipS3())
|
||||||
}
|
}
|
||||||
|
|
||||||
testsuites.RegisterInProcessSuite(driverConstructor, skipCheck)
|
|
||||||
|
|
||||||
// s3Constructor := func() (*Driver, error) {
|
|
||||||
// return s3DriverConstructor(aws.GetRegion(region))
|
|
||||||
// }
|
|
||||||
|
|
||||||
RegisterS3DriverSuite(s3DriverConstructor, skipCheck)
|
|
||||||
|
|
||||||
// testsuites.RegisterIPCSuite(driverName, map[string]string{
|
|
||||||
// "accesskey": accessKey,
|
|
||||||
// "secretkey": secretKey,
|
|
||||||
// "region": region.Name,
|
|
||||||
// "bucket": bucket,
|
|
||||||
// "encrypt": encrypt,
|
|
||||||
// }, skipCheck)
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
|
|
||||||
func RegisterS3DriverSuite(s3DriverConstructor S3DriverConstructor, skipCheck testsuites.SkipCheck) {
|
|
||||||
check.Suite(&S3DriverSuite{
|
|
||||||
Constructor: s3DriverConstructor,
|
|
||||||
SkipCheck: skipCheck,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
type S3DriverSuite struct {
|
|
||||||
Constructor S3DriverConstructor
|
|
||||||
testsuites.SkipCheck
|
|
||||||
}
|
|
||||||
|
|
||||||
func (suite *S3DriverSuite) SetUpSuite(c *check.C) {
|
|
||||||
if reason := suite.SkipCheck(); reason != "" {
|
|
||||||
c.Skip(reason)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (suite *S3DriverSuite) TestEmptyRootList(c *check.C) {
|
|
||||||
validRoot, err := ioutil.TempDir("", "driver-")
|
validRoot, err := ioutil.TempDir("", "driver-")
|
||||||
c.Assert(err, check.IsNil)
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error creating temporary directory: %v", err)
|
||||||
|
}
|
||||||
defer os.Remove(validRoot)
|
defer os.Remove(validRoot)
|
||||||
|
|
||||||
rootedDriver, err := suite.Constructor(validRoot)
|
rootedDriver, err := s3DriverConstructor(validRoot)
|
||||||
c.Assert(err, check.IsNil)
|
if err != nil {
|
||||||
emptyRootDriver, err := suite.Constructor("")
|
t.Fatalf("unexpected error creating rooted driver: %v", err)
|
||||||
c.Assert(err, check.IsNil)
|
}
|
||||||
slashRootDriver, err := suite.Constructor("/")
|
|
||||||
c.Assert(err, check.IsNil)
|
emptyRootDriver, err := s3DriverConstructor("")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error creating empty root driver: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
slashRootDriver, err := s3DriverConstructor("/")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error creating slash root driver: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
filename := "/test"
|
filename := "/test"
|
||||||
contents := []byte("contents")
|
contents := []byte("contents")
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
err = rootedDriver.PutContent(ctx, filename, contents)
|
err = rootedDriver.PutContent(ctx, filename, contents)
|
||||||
c.Assert(err, check.IsNil)
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error creating content: %v", err)
|
||||||
|
}
|
||||||
defer rootedDriver.Delete(ctx, filename)
|
defer rootedDriver.Delete(ctx, filename)
|
||||||
|
|
||||||
keys, err := emptyRootDriver.List(ctx, "/")
|
keys, err := emptyRootDriver.List(ctx, "/")
|
||||||
for _, path := range keys {
|
for _, path := range keys {
|
||||||
c.Assert(storagedriver.PathRegexp.MatchString(path), check.Equals, true)
|
if !storagedriver.PathRegexp.MatchString(path) {
|
||||||
|
t.Fatalf("unexpected string in path: %q != %q", path, storagedriver.PathRegexp)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
keys, err = slashRootDriver.List(ctx, "/")
|
keys, err = slashRootDriver.List(ctx, "/")
|
||||||
for _, path := range keys {
|
for _, path := range keys {
|
||||||
c.Assert(storagedriver.PathRegexp.MatchString(path), check.Equals, true)
|
if !storagedriver.PathRegexp.MatchString(path) {
|
||||||
|
t.Fatalf("unexpected string in path: %q != %q", path, storagedriver.PathRegexp)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,9 +22,9 @@ import (
|
||||||
// Test hooks up gocheck into the "go test" runner.
|
// Test hooks up gocheck into the "go test" runner.
|
||||||
func Test(t *testing.T) { check.TestingT(t) }
|
func Test(t *testing.T) { check.TestingT(t) }
|
||||||
|
|
||||||
// RegisterInProcessSuite registers an in-process storage driver test suite with
|
// RegisterSuite registers an in-process storage driver test suite with
|
||||||
// the go test runner.
|
// the go test runner.
|
||||||
func RegisterInProcessSuite(driverConstructor DriverConstructor, skipCheck SkipCheck) {
|
func RegisterSuite(driverConstructor DriverConstructor, skipCheck SkipCheck) {
|
||||||
check.Suite(&DriverSuite{
|
check.Suite(&DriverSuite{
|
||||||
Constructor: driverConstructor,
|
Constructor: driverConstructor,
|
||||||
SkipCheck: skipCheck,
|
SkipCheck: skipCheck,
|
||||||
|
@ -32,39 +32,6 @@ func RegisterInProcessSuite(driverConstructor DriverConstructor, skipCheck SkipC
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegisterIPCSuite registers a storage driver test suite which runs the named
|
|
||||||
// driver as a child process with the given parameters.
|
|
||||||
func RegisterIPCSuite(driverName string, ipcParams map[string]string, skipCheck SkipCheck) {
|
|
||||||
panic("ipc testing is disabled for now")
|
|
||||||
|
|
||||||
// NOTE(stevvooe): IPC testing is disabled for now. Uncomment the code
|
|
||||||
// block before and remove the panic when we phase it back in.
|
|
||||||
|
|
||||||
// suite := &DriverSuite{
|
|
||||||
// Constructor: func() (storagedriver.StorageDriver, error) {
|
|
||||||
// d, err := ipc.NewDriverClient(driverName, ipcParams)
|
|
||||||
// if err != nil {
|
|
||||||
// return nil, err
|
|
||||||
// }
|
|
||||||
// err = d.Start()
|
|
||||||
// if err != nil {
|
|
||||||
// return nil, err
|
|
||||||
// }
|
|
||||||
// return d, nil
|
|
||||||
// },
|
|
||||||
// SkipCheck: skipCheck,
|
|
||||||
// }
|
|
||||||
// suite.Teardown = func() error {
|
|
||||||
// if suite.StorageDriver == nil {
|
|
||||||
// return nil
|
|
||||||
// }
|
|
||||||
|
|
||||||
// driverClient := suite.StorageDriver.(*ipc.StorageDriverClient)
|
|
||||||
// return driverClient.Stop()
|
|
||||||
// }
|
|
||||||
// check.Suite(suite)
|
|
||||||
}
|
|
||||||
|
|
||||||
// SkipCheck is a function used to determine if a test suite should be skipped.
|
// SkipCheck is a function used to determine if a test suite should be skipped.
|
||||||
// If a SkipCheck returns a non-empty skip reason, the suite is skipped with
|
// If a SkipCheck returns a non-empty skip reason, the suite is skipped with
|
||||||
// the given reason.
|
// the given reason.
|
||||||
|
@ -82,9 +49,8 @@ type DriverConstructor func() (storagedriver.StorageDriver, error)
|
||||||
type DriverTeardown func() error
|
type DriverTeardown func() error
|
||||||
|
|
||||||
// DriverSuite is a gocheck test suite designed to test a
|
// DriverSuite is a gocheck test suite designed to test a
|
||||||
// storagedriver.StorageDriver.
|
// storagedriver.StorageDriver. The intended way to create a DriverSuite is
|
||||||
// The intended way to create a DriverSuite is with RegisterInProcessSuite or
|
// with RegisterSuite.
|
||||||
// RegisterIPCSuite.
|
|
||||||
type DriverSuite struct {
|
type DriverSuite struct {
|
||||||
Constructor DriverConstructor
|
Constructor DriverConstructor
|
||||||
Teardown DriverTeardown
|
Teardown DriverTeardown
|
||||||
|
@ -841,10 +807,6 @@ func (suite *DriverSuite) TestConcurrentStreamReads(c *check.C) {
|
||||||
// TestConcurrentFileStreams checks that multiple *os.File objects can be passed
|
// TestConcurrentFileStreams checks that multiple *os.File objects can be passed
|
||||||
// in to WriteStream concurrently without hanging.
|
// in to WriteStream concurrently without hanging.
|
||||||
func (suite *DriverSuite) TestConcurrentFileStreams(c *check.C) {
|
func (suite *DriverSuite) TestConcurrentFileStreams(c *check.C) {
|
||||||
// if _, isIPC := suite.StorageDriver.(*ipc.StorageDriverClient); isIPC {
|
|
||||||
// c.Skip("Need to fix out-of-process concurrency")
|
|
||||||
// }
|
|
||||||
|
|
||||||
numStreams := 32
|
numStreams := 32
|
||||||
|
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
|
|
Loading…
Reference in a new issue