Uses streams internally for ipc Get/Put Content

This is done because libchan/spdystream does not currently support sending
serialzied objects of size larger than 16MB
See https://github.com/docker/libchan/issues/65
This commit is contained in:
Brian Bland 2014-10-24 18:33:23 -07:00
parent 3f95694180
commit 7c892deb1c
4 changed files with 37 additions and 26 deletions

View file

@ -1,8 +1,10 @@
package ipc package ipc
import ( import (
"bytes"
"encoding/json" "encoding/json"
"io" "io"
"io/ioutil"
"net" "net"
"os" "os"
"os/exec" "os/exec"
@ -116,7 +118,7 @@ func (driver *StorageDriverClient) GetContent(path string) ([]byte, error) {
return nil, err return nil, err
} }
var response GetContentResponse var response ReadStreamResponse
err = receiver.Receive(&response) err = receiver.Receive(&response)
if err != nil { if err != nil {
return nil, err return nil, err
@ -126,22 +128,26 @@ func (driver *StorageDriverClient) GetContent(path string) ([]byte, error) {
return nil, response.Error return nil, response.Error
} }
return response.Content, nil defer response.Reader.Close()
contents, err := ioutil.ReadAll(response.Reader)
if err != nil {
return nil, err
}
return contents, nil
} }
func (driver *StorageDriverClient) PutContent(path string, contents []byte) error { func (driver *StorageDriverClient) PutContent(path string, contents []byte) error {
receiver, remoteSender := libchan.Pipe() receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"Path": path, "Contents": contents} params := map[string]interface{}{"Path": path, "Reader": WrapReader(bytes.NewReader(contents))}
err := driver.sender.Send(&Request{Type: "PutContent", Parameters: params, ResponseChannel: remoteSender}) err := driver.sender.Send(&Request{Type: "PutContent", Parameters: params, ResponseChannel: remoteSender})
if err != nil { if err != nil {
return err return err
} }
var response PutContentResponse var response WriteStreamResponse
err = receiver.Receive(&response) err = receiver.Receive(&response)
if err != nil { if err != nil {
panic(err)
return err return err
} }
@ -177,7 +183,7 @@ func (driver *StorageDriverClient) ReadStream(path string, offset uint64) (io.Re
func (driver *StorageDriverClient) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error { func (driver *StorageDriverClient) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error {
receiver, remoteSender := libchan.Pipe() receiver, remoteSender := libchan.Pipe()
params := map[string]interface{}{"Path": path, "Offset": offset, "Size": size, "Reader": WrapReadCloser(reader)} params := map[string]interface{}{"Path": path, "Offset": offset, "Size": size, "Reader": WrapReader(reader)}
err := driver.sender.Send(&Request{Type: "WriteStream", Parameters: params, ResponseChannel: remoteSender}) err := driver.sender.Send(&Request{Type: "WriteStream", Parameters: params, ResponseChannel: remoteSender})
if err != nil { if err != nil {
return err return err

View file

@ -4,6 +4,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"reflect" "reflect"
"github.com/docker/libchan" "github.com/docker/libchan"
@ -23,8 +24,14 @@ func (r noWriteReadWriteCloser) Write(p []byte) (n int, err error) {
return 0, errors.New("Write unsupported") return 0, errors.New("Write unsupported")
} }
func WrapReadCloser(readCloser io.ReadCloser) io.ReadWriteCloser { func WrapReader(reader io.Reader) io.ReadWriteCloser {
if readWriteCloser, ok := reader.(io.ReadWriteCloser); ok {
return readWriteCloser
} else if readCloser, ok := reader.(io.ReadCloser); ok {
return noWriteReadWriteCloser{readCloser} return noWriteReadWriteCloser{readCloser}
} else {
return noWriteReadWriteCloser{ioutil.NopCloser(reader)}
}
} }
type responseError struct { type responseError struct {
@ -46,15 +53,6 @@ func (err *responseError) Error() string {
return fmt.Sprintf("%s: %s", err.Type, err.Message) return fmt.Sprintf("%s: %s", err.Type, err.Message)
} }
type GetContentResponse struct {
Content []byte
Error *responseError
}
type PutContentResponse struct {
Error *responseError
}
type ReadStreamResponse struct { type ReadStreamResponse struct {
Reader io.ReadWriteCloser Reader io.ReadWriteCloser
Error *responseError Error *responseError

View file

@ -1,7 +1,9 @@
package ipc package ipc
import ( import (
"bytes"
"io" "io"
"io/ioutil"
"net" "net"
"os" "os"
@ -44,14 +46,15 @@ func receive(driver storagedriver.StorageDriver, receiver libchan.Receiver) {
} }
func handleRequest(driver storagedriver.StorageDriver, request Request) { func handleRequest(driver storagedriver.StorageDriver, request Request) {
switch request.Type { switch request.Type {
case "GetContent": case "GetContent":
path, _ := request.Parameters["Path"].(string) path, _ := request.Parameters["Path"].(string)
content, err := driver.GetContent(path) content, err := driver.GetContent(path)
response := GetContentResponse{ var response ReadStreamResponse
Content: content, if err != nil {
Error: ResponseError(err), response = ReadStreamResponse{Error: ResponseError(err)}
} else {
response = ReadStreamResponse{Reader: WrapReader(bytes.NewReader(content))}
} }
err = request.ResponseChannel.Send(&response) err = request.ResponseChannel.Send(&response)
if err != nil { if err != nil {
@ -59,9 +62,13 @@ func handleRequest(driver storagedriver.StorageDriver, request Request) {
} }
case "PutContent": case "PutContent":
path, _ := request.Parameters["Path"].(string) path, _ := request.Parameters["Path"].(string)
contents, _ := request.Parameters["Contents"].([]byte) reader, _ := request.Parameters["Reader"].(io.ReadCloser)
err := driver.PutContent(path, contents) contents, err := ioutil.ReadAll(reader)
response := PutContentResponse{ defer reader.Close()
if err == nil {
err = driver.PutContent(path, contents)
}
response := WriteStreamResponse{
Error: ResponseError(err), Error: ResponseError(err),
} }
err = request.ResponseChannel.Send(&response) err = request.ResponseChannel.Send(&response)
@ -82,7 +89,7 @@ func handleRequest(driver storagedriver.StorageDriver, request Request) {
if err != nil { if err != nil {
response = ReadStreamResponse{Error: ResponseError(err)} response = ReadStreamResponse{Error: ResponseError(err)}
} else { } else {
response = ReadStreamResponse{Reader: WrapReadCloser(reader)} response = ReadStreamResponse{Reader: WrapReader(reader)}
} }
err = request.ResponseChannel.Send(&response) err = request.ResponseChannel.Send(&response)
if err != nil { if err != nil {

View file

@ -128,7 +128,7 @@ func (suite *DriverSuite) TestWriteReadStreams4(c *C) {
func (suite *DriverSuite) TestContinueStreamAppend(c *C) { func (suite *DriverSuite) TestContinueStreamAppend(c *C) {
filename := randomString(32) filename := randomString(32)
chunkSize := uint64(32) chunkSize := uint64(5 * 1024 * 1024)
contentsChunk1 := []byte(randomString(chunkSize)) contentsChunk1 := []byte(randomString(chunkSize))
contentsChunk2 := []byte(randomString(chunkSize)) contentsChunk2 := []byte(randomString(chunkSize))