From 7c892deb1c12c8587b24a2f57cad8f56f9a0817d Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Fri, 24 Oct 2014 18:33:23 -0700 Subject: [PATCH] Uses streams internally for ipc Get/Put Content This is done because libchan/spdystream does not currently support sending serialzied objects of size larger than 16MB See https://github.com/docker/libchan/issues/65 --- storagedriver/ipc/client.go | 18 ++++++++++++------ storagedriver/ipc/ipc.go | 20 +++++++++----------- storagedriver/ipc/server.go | 23 +++++++++++++++-------- storagedriver/testsuites/testsuites.go | 2 +- 4 files changed, 37 insertions(+), 26 deletions(-) diff --git a/storagedriver/ipc/client.go b/storagedriver/ipc/client.go index c4e50a4d5..0025d2bc0 100644 --- a/storagedriver/ipc/client.go +++ b/storagedriver/ipc/client.go @@ -1,8 +1,10 @@ package ipc import ( + "bytes" "encoding/json" "io" + "io/ioutil" "net" "os" "os/exec" @@ -116,7 +118,7 @@ func (driver *StorageDriverClient) GetContent(path string) ([]byte, error) { return nil, err } - var response GetContentResponse + var response ReadStreamResponse err = receiver.Receive(&response) if err != nil { return nil, err @@ -126,22 +128,26 @@ func (driver *StorageDriverClient) GetContent(path string) ([]byte, error) { return nil, response.Error } - return response.Content, nil + defer response.Reader.Close() + contents, err := ioutil.ReadAll(response.Reader) + if err != nil { + return nil, err + } + return contents, nil } func (driver *StorageDriverClient) PutContent(path string, contents []byte) error { receiver, remoteSender := libchan.Pipe() - params := map[string]interface{}{"Path": path, "Contents": contents} + params := map[string]interface{}{"Path": path, "Reader": WrapReader(bytes.NewReader(contents))} err := driver.sender.Send(&Request{Type: "PutContent", Parameters: params, ResponseChannel: remoteSender}) if err != nil { return err } - var response PutContentResponse + var response WriteStreamResponse err = receiver.Receive(&response) if err != nil { - panic(err) return err } @@ -177,7 +183,7 @@ func (driver *StorageDriverClient) ReadStream(path string, offset uint64) (io.Re func (driver *StorageDriverClient) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error { receiver, remoteSender := libchan.Pipe() - params := map[string]interface{}{"Path": path, "Offset": offset, "Size": size, "Reader": WrapReadCloser(reader)} + params := map[string]interface{}{"Path": path, "Offset": offset, "Size": size, "Reader": WrapReader(reader)} err := driver.sender.Send(&Request{Type: "WriteStream", Parameters: params, ResponseChannel: remoteSender}) if err != nil { return err diff --git a/storagedriver/ipc/ipc.go b/storagedriver/ipc/ipc.go index ab960b82f..89b0cf206 100644 --- a/storagedriver/ipc/ipc.go +++ b/storagedriver/ipc/ipc.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "io" + "io/ioutil" "reflect" "github.com/docker/libchan" @@ -23,8 +24,14 @@ func (r noWriteReadWriteCloser) Write(p []byte) (n int, err error) { return 0, errors.New("Write unsupported") } -func WrapReadCloser(readCloser io.ReadCloser) io.ReadWriteCloser { - return noWriteReadWriteCloser{readCloser} +func WrapReader(reader io.Reader) io.ReadWriteCloser { + if readWriteCloser, ok := reader.(io.ReadWriteCloser); ok { + return readWriteCloser + } else if readCloser, ok := reader.(io.ReadCloser); ok { + return noWriteReadWriteCloser{readCloser} + } else { + return noWriteReadWriteCloser{ioutil.NopCloser(reader)} + } } type responseError struct { @@ -46,15 +53,6 @@ func (err *responseError) Error() string { return fmt.Sprintf("%s: %s", err.Type, err.Message) } -type GetContentResponse struct { - Content []byte - Error *responseError -} - -type PutContentResponse struct { - Error *responseError -} - type ReadStreamResponse struct { Reader io.ReadWriteCloser Error *responseError diff --git a/storagedriver/ipc/server.go b/storagedriver/ipc/server.go index 2e240f428..0d39a31bf 100644 --- a/storagedriver/ipc/server.go +++ b/storagedriver/ipc/server.go @@ -1,7 +1,9 @@ package ipc import ( + "bytes" "io" + "io/ioutil" "net" "os" @@ -44,14 +46,15 @@ func receive(driver storagedriver.StorageDriver, receiver libchan.Receiver) { } func handleRequest(driver storagedriver.StorageDriver, request Request) { - switch request.Type { case "GetContent": path, _ := request.Parameters["Path"].(string) content, err := driver.GetContent(path) - response := GetContentResponse{ - Content: content, - Error: ResponseError(err), + var response ReadStreamResponse + if err != nil { + response = ReadStreamResponse{Error: ResponseError(err)} + } else { + response = ReadStreamResponse{Reader: WrapReader(bytes.NewReader(content))} } err = request.ResponseChannel.Send(&response) if err != nil { @@ -59,9 +62,13 @@ func handleRequest(driver storagedriver.StorageDriver, request Request) { } case "PutContent": path, _ := request.Parameters["Path"].(string) - contents, _ := request.Parameters["Contents"].([]byte) - err := driver.PutContent(path, contents) - response := PutContentResponse{ + reader, _ := request.Parameters["Reader"].(io.ReadCloser) + contents, err := ioutil.ReadAll(reader) + defer reader.Close() + if err == nil { + err = driver.PutContent(path, contents) + } + response := WriteStreamResponse{ Error: ResponseError(err), } err = request.ResponseChannel.Send(&response) @@ -82,7 +89,7 @@ func handleRequest(driver storagedriver.StorageDriver, request Request) { if err != nil { response = ReadStreamResponse{Error: ResponseError(err)} } else { - response = ReadStreamResponse{Reader: WrapReadCloser(reader)} + response = ReadStreamResponse{Reader: WrapReader(reader)} } err = request.ResponseChannel.Send(&response) if err != nil { diff --git a/storagedriver/testsuites/testsuites.go b/storagedriver/testsuites/testsuites.go index 7ca196d6e..d9d3dead7 100644 --- a/storagedriver/testsuites/testsuites.go +++ b/storagedriver/testsuites/testsuites.go @@ -128,7 +128,7 @@ func (suite *DriverSuite) TestWriteReadStreams4(c *C) { func (suite *DriverSuite) TestContinueStreamAppend(c *C) { filename := randomString(32) - chunkSize := uint64(32) + chunkSize := uint64(5 * 1024 * 1024) contentsChunk1 := []byte(randomString(chunkSize)) contentsChunk2 := []byte(randomString(chunkSize))