forked from TrueCloudLab/distribution
a3481c5f1c
This only works for a specific whitelist of error types, which is currently all errors in the storagedriver package. Also improves storagedriver tests to enforce proper error types are returned
457 lines
11 KiB
Go
457 lines
11 KiB
Go
package ipc
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"net"
|
|
"os"
|
|
"os/exec"
|
|
"syscall"
|
|
|
|
"github.com/docker/docker-registry/storagedriver"
|
|
"github.com/docker/libchan"
|
|
"github.com/docker/libchan/spdy"
|
|
)
|
|
|
|
// StorageDriverExecutablePrefix is the prefix which the IPC storage driver
|
|
// loader expects driver executables to begin with. For example, the s3 driver
|
|
// should be named "registry-storagedriver-s3".
|
|
const StorageDriverExecutablePrefix = "registry-storagedriver-"
|
|
|
|
// StorageDriverClient is a storagedriver.StorageDriver implementation using a
|
|
// managed child process communicating over IPC using libchan with a unix domain
|
|
// socket
|
|
type StorageDriverClient struct {
|
|
subprocess *exec.Cmd
|
|
exitChan chan error
|
|
exitErr error
|
|
stopChan chan struct{}
|
|
socket *os.File
|
|
transport *spdy.Transport
|
|
sender libchan.Sender
|
|
version storagedriver.Version
|
|
}
|
|
|
|
// NewDriverClient constructs a new out-of-process storage driver using the
|
|
// driver name and configuration parameters
|
|
// A user must call Start on this driver client before remote method calls can
|
|
// be made
|
|
//
|
|
// Looks for drivers in the following locations in order:
|
|
// - Storage drivers directory (to be determined, yet not implemented)
|
|
// - $GOPATH/bin
|
|
// - $PATH
|
|
func NewDriverClient(name string, parameters map[string]string) (*StorageDriverClient, error) {
|
|
paramsBytes, err := json.Marshal(parameters)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
driverExecName := StorageDriverExecutablePrefix + name
|
|
driverPath, err := exec.LookPath(driverExecName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
command := exec.Command(driverPath, string(paramsBytes))
|
|
|
|
return &StorageDriverClient{
|
|
subprocess: command,
|
|
}, nil
|
|
}
|
|
|
|
// Start starts the designated child process storage driver and binds a socket
|
|
// to this process for IPC method calls
|
|
func (driver *StorageDriverClient) Start() error {
|
|
driver.exitErr = nil
|
|
driver.exitChan = make(chan error)
|
|
driver.stopChan = make(chan struct{})
|
|
|
|
fileDescriptors, err := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM, 0)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
childSocket := os.NewFile(uintptr(fileDescriptors[0]), "childSocket")
|
|
driver.socket = os.NewFile(uintptr(fileDescriptors[1]), "parentSocket")
|
|
|
|
driver.subprocess.Stdout = os.Stdout
|
|
driver.subprocess.Stderr = os.Stderr
|
|
driver.subprocess.ExtraFiles = []*os.File{childSocket}
|
|
|
|
if err = driver.subprocess.Start(); err != nil {
|
|
driver.Stop()
|
|
return err
|
|
}
|
|
|
|
go driver.handleSubprocessExit()
|
|
|
|
if err = childSocket.Close(); err != nil {
|
|
driver.Stop()
|
|
return err
|
|
}
|
|
|
|
connection, err := net.FileConn(driver.socket)
|
|
if err != nil {
|
|
driver.Stop()
|
|
return err
|
|
}
|
|
driver.transport, err = spdy.NewClientTransport(connection)
|
|
if err != nil {
|
|
driver.Stop()
|
|
return err
|
|
}
|
|
driver.sender, err = driver.transport.NewSendChannel()
|
|
if err != nil {
|
|
driver.Stop()
|
|
return err
|
|
}
|
|
|
|
// Check the driver's version to determine compatibility
|
|
receiver, remoteSender := libchan.Pipe()
|
|
err = driver.sender.Send(&Request{Type: "Version", ResponseChannel: remoteSender})
|
|
if err != nil {
|
|
driver.Stop()
|
|
return err
|
|
}
|
|
|
|
var response VersionResponse
|
|
err = receiver.Receive(&response)
|
|
if err != nil {
|
|
driver.Stop()
|
|
return err
|
|
}
|
|
|
|
if response.Error != nil {
|
|
return response.Error.Unwrap()
|
|
}
|
|
|
|
driver.version = response.Version
|
|
|
|
if driver.version.Major() != storagedriver.CurrentVersion.Major() || driver.version.Minor() > storagedriver.CurrentVersion.Minor() {
|
|
return IncompatibleVersionError{driver.version}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop stops the child process storage driver
|
|
// storagedriver.StorageDriver methods called after Stop will fail
|
|
func (driver *StorageDriverClient) Stop() error {
|
|
var closeSenderErr, closeTransportErr, closeSocketErr, killErr error
|
|
|
|
if driver.sender != nil {
|
|
closeSenderErr = driver.sender.Close()
|
|
}
|
|
if driver.transport != nil {
|
|
closeTransportErr = driver.transport.Close()
|
|
}
|
|
if driver.socket != nil {
|
|
closeSocketErr = driver.socket.Close()
|
|
}
|
|
if driver.subprocess != nil {
|
|
killErr = driver.subprocess.Process.Kill()
|
|
}
|
|
if driver.stopChan != nil {
|
|
driver.stopChan <- struct{}{}
|
|
close(driver.stopChan)
|
|
}
|
|
|
|
if closeSenderErr != nil {
|
|
return closeSenderErr
|
|
} else if closeTransportErr != nil {
|
|
return closeTransportErr
|
|
} else if closeSocketErr != nil {
|
|
return closeSocketErr
|
|
}
|
|
|
|
return killErr
|
|
}
|
|
|
|
// Implement the storagedriver.StorageDriver interface over IPC
|
|
|
|
// GetContent retrieves the content stored at "path" as a []byte.
|
|
func (driver *StorageDriverClient) GetContent(path string) ([]byte, error) {
|
|
if err := driver.exited(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
receiver, remoteSender := libchan.Pipe()
|
|
|
|
params := map[string]interface{}{"Path": path}
|
|
err := driver.sender.Send(&Request{Type: "GetContent", Parameters: params, ResponseChannel: remoteSender})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
response := new(ReadStreamResponse)
|
|
err = driver.receiveResponse(receiver, response)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if response.Error != nil {
|
|
return nil, response.Error.Unwrap()
|
|
}
|
|
|
|
defer response.Reader.Close()
|
|
contents, err := ioutil.ReadAll(response.Reader)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return contents, nil
|
|
}
|
|
|
|
// PutContent stores the []byte content at a location designated by "path".
|
|
func (driver *StorageDriverClient) PutContent(path string, contents []byte) error {
|
|
if err := driver.exited(); err != nil {
|
|
return err
|
|
}
|
|
|
|
receiver, remoteSender := libchan.Pipe()
|
|
|
|
params := map[string]interface{}{"Path": path, "Reader": ioutil.NopCloser(bytes.NewReader(contents))}
|
|
err := driver.sender.Send(&Request{Type: "PutContent", Parameters: params, ResponseChannel: remoteSender})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
response := new(WriteStreamResponse)
|
|
err = driver.receiveResponse(receiver, response)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if response.Error != nil {
|
|
return response.Error.Unwrap()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
|
|
// given byte offset.
|
|
func (driver *StorageDriverClient) ReadStream(path string, offset uint64) (io.ReadCloser, error) {
|
|
if err := driver.exited(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
receiver, remoteSender := libchan.Pipe()
|
|
params := map[string]interface{}{"Path": path, "Offset": offset}
|
|
err := driver.sender.Send(&Request{Type: "ReadStream", Parameters: params, ResponseChannel: remoteSender})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
response := new(ReadStreamResponse)
|
|
err = driver.receiveResponse(receiver, response)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if response.Error != nil {
|
|
return nil, response.Error.Unwrap()
|
|
}
|
|
|
|
return response.Reader, nil
|
|
}
|
|
|
|
// WriteStream stores the contents of the provided io.ReadCloser at a location
|
|
// designated by the given path.
|
|
func (driver *StorageDriverClient) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error {
|
|
if err := driver.exited(); err != nil {
|
|
return err
|
|
}
|
|
|
|
receiver, remoteSender := libchan.Pipe()
|
|
params := map[string]interface{}{"Path": path, "Offset": offset, "Size": size, "Reader": ioutil.NopCloser(reader)}
|
|
err := driver.sender.Send(&Request{Type: "WriteStream", Parameters: params, ResponseChannel: remoteSender})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
response := new(WriteStreamResponse)
|
|
err = driver.receiveResponse(receiver, response)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if response.Error != nil {
|
|
return response.Error.Unwrap()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// CurrentSize retrieves the curernt size in bytes of the object at the given
|
|
// path.
|
|
func (driver *StorageDriverClient) CurrentSize(path string) (uint64, error) {
|
|
if err := driver.exited(); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
receiver, remoteSender := libchan.Pipe()
|
|
params := map[string]interface{}{"Path": path}
|
|
err := driver.sender.Send(&Request{Type: "CurrentSize", Parameters: params, ResponseChannel: remoteSender})
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
response := new(CurrentSizeResponse)
|
|
err = driver.receiveResponse(receiver, response)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if response.Error != nil {
|
|
return 0, response.Error.Unwrap()
|
|
}
|
|
|
|
return response.Position, nil
|
|
}
|
|
|
|
// List returns a list of the objects that are direct descendants of the given
|
|
// path.
|
|
func (driver *StorageDriverClient) List(path string) ([]string, error) {
|
|
if err := driver.exited(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
receiver, remoteSender := libchan.Pipe()
|
|
params := map[string]interface{}{"Path": path}
|
|
err := driver.sender.Send(&Request{Type: "List", Parameters: params, ResponseChannel: remoteSender})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
response := new(ListResponse)
|
|
err = driver.receiveResponse(receiver, response)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if response.Error != nil {
|
|
return nil, response.Error.Unwrap()
|
|
}
|
|
|
|
return response.Keys, nil
|
|
}
|
|
|
|
// Move moves an object stored at sourcePath to destPath, removing the original
|
|
// object.
|
|
func (driver *StorageDriverClient) Move(sourcePath string, destPath string) error {
|
|
if err := driver.exited(); err != nil {
|
|
return err
|
|
}
|
|
|
|
receiver, remoteSender := libchan.Pipe()
|
|
params := map[string]interface{}{"SourcePath": sourcePath, "DestPath": destPath}
|
|
err := driver.sender.Send(&Request{Type: "Move", Parameters: params, ResponseChannel: remoteSender})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
response := new(MoveResponse)
|
|
err = driver.receiveResponse(receiver, response)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if response.Error != nil {
|
|
return response.Error.Unwrap()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Delete recursively deletes all objects stored at "path" and its subpaths.
|
|
func (driver *StorageDriverClient) Delete(path string) error {
|
|
if err := driver.exited(); err != nil {
|
|
return err
|
|
}
|
|
|
|
receiver, remoteSender := libchan.Pipe()
|
|
params := map[string]interface{}{"Path": path}
|
|
err := driver.sender.Send(&Request{Type: "Delete", Parameters: params, ResponseChannel: remoteSender})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
response := new(DeleteResponse)
|
|
err = driver.receiveResponse(receiver, response)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if response.Error != nil {
|
|
return response.Error.Unwrap()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// handleSubprocessExit populates the exit channel until we have explicitly
|
|
// stopped the storage driver subprocess
|
|
// Requests can select on driver.exitChan and response receiving and not hang if
|
|
// the process exits
|
|
func (driver *StorageDriverClient) handleSubprocessExit() {
|
|
exitErr := driver.subprocess.Wait()
|
|
if exitErr == nil {
|
|
exitErr = fmt.Errorf("Storage driver subprocess already exited cleanly")
|
|
} else {
|
|
exitErr = fmt.Errorf("Storage driver subprocess exited with error: %s", exitErr)
|
|
}
|
|
|
|
driver.exitErr = exitErr
|
|
|
|
for {
|
|
select {
|
|
case driver.exitChan <- exitErr:
|
|
case <-driver.stopChan:
|
|
close(driver.exitChan)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// receiveResponse populates the response value with the next result from the
|
|
// given receiver, or returns an error if receiving failed or the driver has
|
|
// stopped
|
|
func (driver *StorageDriverClient) receiveResponse(receiver libchan.Receiver, response interface{}) error {
|
|
receiveChan := make(chan error, 1)
|
|
go func(receiveChan chan<- error) {
|
|
defer close(receiveChan)
|
|
receiveChan <- receiver.Receive(response)
|
|
}(receiveChan)
|
|
|
|
var err error
|
|
var ok bool
|
|
select {
|
|
case err = <-receiveChan:
|
|
case err, ok = <-driver.exitChan:
|
|
go func(receiveChan <-chan error) {
|
|
<-receiveChan
|
|
}(receiveChan)
|
|
if !ok {
|
|
err = driver.exitErr
|
|
}
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// exited returns an exit error if the driver has exited or nil otherwise
|
|
func (driver *StorageDriverClient) exited() error {
|
|
select {
|
|
case err, ok := <-driver.exitChan:
|
|
if !ok {
|
|
return driver.exitErr
|
|
}
|
|
return err
|
|
default:
|
|
return nil
|
|
}
|
|
}
|