From 31df62064d58c3702730489b14f06add80d23597 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Thu, 6 Nov 2014 14:06:16 -0800 Subject: [PATCH] Adds logic for tracking ipc storage driver process status This allows requests to not hang if the child process exits --- storagedriver/ipc/client.go | 169 +++++++++++++++++++++++++++++------- 1 file changed, 138 insertions(+), 31 deletions(-) diff --git a/storagedriver/ipc/client.go b/storagedriver/ipc/client.go index ad4fe5ee..7f41081a 100644 --- a/storagedriver/ipc/client.go +++ b/storagedriver/ipc/client.go @@ -3,6 +3,7 @@ package ipc import ( "bytes" "encoding/json" + "fmt" "io" "io/ioutil" "net" @@ -15,23 +16,29 @@ import ( "github.com/docker/libchan/spdy" ) -// StorageDriverExecutablePrefix is the prefix which the IPC storage driver loader expects driver -// executables to begin with. For example, the s3 driver should be named "registry-storage-s3". +// StorageDriverExecutablePrefix is the prefix which the IPC storage driver +// loader expects driver executables to begin with. For example, the s3 driver +// should be named "registry-storagedriver-s3". const StorageDriverExecutablePrefix = "registry-storagedriver-" -// StorageDriverClient is a storagedriver.StorageDriver implementation using a managed child process -// communicating over IPC using libchan with a unix domain socket +// StorageDriverClient is a storagedriver.StorageDriver implementation using a +// managed child process communicating over IPC using libchan with a unix domain +// socket type StorageDriverClient struct { subprocess *exec.Cmd + exitChan chan error + exitErr error + stopChan chan struct{} socket *os.File transport *spdy.Transport sender libchan.Sender version storagedriver.Version } -// NewDriverClient constructs a new out-of-process storage driver using the driver name and -// configuration parameters -// A user must call Start on this driver client before remote method calls can be made +// NewDriverClient constructs a new out-of-process storage driver using the +// driver name and configuration parameters +// A user must call Start on this driver client before remote method calls can +// be made // // Looks for drivers in the following locations in order: // - Storage drivers directory (to be determined, yet not implemented) @@ -56,9 +63,13 @@ func NewDriverClient(name string, parameters map[string]string) (*StorageDriverC }, nil } -// Start starts the designated child process storage driver and binds a socket to this process for -// IPC method calls +// Start starts the designated child process storage driver and binds a socket +// to this process for IPC method calls func (driver *StorageDriverClient) Start() error { + driver.exitErr = nil + driver.exitChan = make(chan error) + driver.stopChan = make(chan struct{}) + fileDescriptors, err := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM, 0) if err != nil { return err @@ -76,6 +87,8 @@ func (driver *StorageDriverClient) Start() error { return err } + go driver.handleSubprocessExit() + if err = childSocket.Close(); err != nil { driver.Stop() return err @@ -142,6 +155,10 @@ func (driver *StorageDriverClient) Stop() error { if driver.subprocess != nil { killErr = driver.subprocess.Process.Kill() } + if driver.stopChan != nil { + driver.stopChan <- struct{}{} + close(driver.stopChan) + } if closeSenderErr != nil { return closeSenderErr @@ -150,12 +167,17 @@ func (driver *StorageDriverClient) Stop() error { } else if closeSocketErr != nil { return closeSocketErr } + return killErr } // Implement the storagedriver.StorageDriver interface over IPC func (driver *StorageDriverClient) GetContent(path string) ([]byte, error) { + if err := driver.exited(); err != nil { + return nil, err + } + receiver, remoteSender := libchan.Pipe() params := map[string]interface{}{"Path": path} @@ -164,8 +186,8 @@ func (driver *StorageDriverClient) GetContent(path string) ([]byte, error) { return nil, err } - var response ReadStreamResponse - err = receiver.Receive(&response) + response := new(ReadStreamResponse) + err = driver.receiveResponse(receiver, response) if err != nil { return nil, err } @@ -183,6 +205,10 @@ func (driver *StorageDriverClient) GetContent(path string) ([]byte, error) { } func (driver *StorageDriverClient) PutContent(path string, contents []byte) error { + if err := driver.exited(); err != nil { + return err + } + receiver, remoteSender := libchan.Pipe() params := map[string]interface{}{"Path": path, "Reader": ioutil.NopCloser(bytes.NewReader(contents))} @@ -191,8 +217,8 @@ func (driver *StorageDriverClient) PutContent(path string, contents []byte) erro return err } - var response WriteStreamResponse - err = receiver.Receive(&response) + response := new(WriteStreamResponse) + err = driver.receiveResponse(receiver, response) if err != nil { return err } @@ -205,16 +231,19 @@ func (driver *StorageDriverClient) PutContent(path string, contents []byte) erro } func (driver *StorageDriverClient) ReadStream(path string, offset uint64) (io.ReadCloser, error) { - receiver, remoteSender := libchan.Pipe() + if err := driver.exited(); err != nil { + return nil, err + } + receiver, remoteSender := libchan.Pipe() params := map[string]interface{}{"Path": path, "Offset": offset} err := driver.sender.Send(&Request{Type: "ReadStream", Parameters: params, ResponseChannel: remoteSender}) if err != nil { return nil, err } - var response ReadStreamResponse - err = receiver.Receive(&response) + response := new(ReadStreamResponse) + err = driver.receiveResponse(receiver, response) if err != nil { return nil, err } @@ -227,16 +256,19 @@ func (driver *StorageDriverClient) ReadStream(path string, offset uint64) (io.Re } func (driver *StorageDriverClient) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error { - receiver, remoteSender := libchan.Pipe() + if err := driver.exited(); err != nil { + return err + } + receiver, remoteSender := libchan.Pipe() params := map[string]interface{}{"Path": path, "Offset": offset, "Size": size, "Reader": ioutil.NopCloser(reader)} err := driver.sender.Send(&Request{Type: "WriteStream", Parameters: params, ResponseChannel: remoteSender}) if err != nil { return err } - var response WriteStreamResponse - err = receiver.Receive(&response) + response := new(WriteStreamResponse) + err = driver.receiveResponse(receiver, response) if err != nil { return err } @@ -249,16 +281,19 @@ func (driver *StorageDriverClient) WriteStream(path string, offset, size uint64, } func (driver *StorageDriverClient) CurrentSize(path string) (uint64, error) { - receiver, remoteSender := libchan.Pipe() + if err := driver.exited(); err != nil { + return 0, err + } + receiver, remoteSender := libchan.Pipe() params := map[string]interface{}{"Path": path} err := driver.sender.Send(&Request{Type: "CurrentSize", Parameters: params, ResponseChannel: remoteSender}) if err != nil { return 0, err } - var response CurrentSizeResponse - err = receiver.Receive(&response) + response := new(CurrentSizeResponse) + err = driver.receiveResponse(receiver, response) if err != nil { return 0, err } @@ -271,16 +306,19 @@ func (driver *StorageDriverClient) CurrentSize(path string) (uint64, error) { } func (driver *StorageDriverClient) List(path string) ([]string, error) { - receiver, remoteSender := libchan.Pipe() + if err := driver.exited(); err != nil { + return nil, err + } + receiver, remoteSender := libchan.Pipe() params := map[string]interface{}{"Path": path} err := driver.sender.Send(&Request{Type: "List", Parameters: params, ResponseChannel: remoteSender}) if err != nil { return nil, err } - var response ListResponse - err = receiver.Receive(&response) + response := new(ListResponse) + err = driver.receiveResponse(receiver, response) if err != nil { return nil, err } @@ -293,16 +331,19 @@ func (driver *StorageDriverClient) List(path string) ([]string, error) { } func (driver *StorageDriverClient) Move(sourcePath string, destPath string) error { - receiver, remoteSender := libchan.Pipe() + if err := driver.exited(); err != nil { + return err + } + receiver, remoteSender := libchan.Pipe() params := map[string]interface{}{"SourcePath": sourcePath, "DestPath": destPath} err := driver.sender.Send(&Request{Type: "Move", Parameters: params, ResponseChannel: remoteSender}) if err != nil { return err } - var response MoveResponse - err = receiver.Receive(&response) + response := new(MoveResponse) + err = driver.receiveResponse(receiver, response) if err != nil { return err } @@ -315,16 +356,19 @@ func (driver *StorageDriverClient) Move(sourcePath string, destPath string) erro } func (driver *StorageDriverClient) Delete(path string) error { - receiver, remoteSender := libchan.Pipe() + if err := driver.exited(); err != nil { + return err + } + receiver, remoteSender := libchan.Pipe() params := map[string]interface{}{"Path": path} err := driver.sender.Send(&Request{Type: "Delete", Parameters: params, ResponseChannel: remoteSender}) if err != nil { return err } - var response DeleteResponse - err = receiver.Receive(&response) + response := new(DeleteResponse) + err = driver.receiveResponse(receiver, response) if err != nil { return err } @@ -335,3 +379,66 @@ func (driver *StorageDriverClient) Delete(path string) error { return nil } + +// handleSubprocessExit populates the exit channel until we have explicitly +// stopped the storage driver subprocess +// Requests can select on driver.exitChan and response receiving and not hang if +// the process exits +func (driver *StorageDriverClient) handleSubprocessExit() { + exitErr := driver.subprocess.Wait() + if exitErr == nil { + exitErr = fmt.Errorf("Storage driver subprocess already exited cleanly") + } else { + exitErr = fmt.Errorf("Storage driver subprocess exited with error: %s", exitErr) + } + + driver.exitErr = exitErr + + for { + select { + case driver.exitChan <- exitErr: + case <-driver.stopChan: + close(driver.exitChan) + return + } + } +} + +// receiveResponse populates the response value with the next result from the +// given receiver, or returns an error if receiving failed or the driver has +// stopped +func (driver *StorageDriverClient) receiveResponse(receiver libchan.Receiver, response interface{}) error { + receiveChan := make(chan error, 1) + go func(receiveChan chan<- error) { + defer close(receiveChan) + receiveChan <- receiver.Receive(response) + }(receiveChan) + + var err error + var ok bool + select { + case err = <-receiveChan: + case err, ok = <-driver.exitChan: + go func(receiveChan <-chan error) { + <-receiveChan + }(receiveChan) + if !ok { + err = driver.exitErr + } + } + + return err +} + +// exited returns an exit error if the driver has exited or nil otherwise +func (driver *StorageDriverClient) exited() error { + select { + case err, ok := <-driver.exitChan: + if !ok { + return driver.exitErr + } + return err + default: + return nil + } +}