From 3f95694180729cf78629f4062617ac10132bec50 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Tue, 21 Oct 2014 15:02:20 -0700 Subject: [PATCH] Adds storage driver interface, tests, and two basic implementations --- .travis.yml | 5 + main/storagedriver/filesystem/filesystem.go | 26 ++ main/storagedriver/inmemory/inmemory.go | 10 + storagedriver/filesystem/filesystem.go | 173 ++++++++++ storagedriver/filesystem/filesystem_test.go | 24 ++ storagedriver/inmemory/inmemory.go | 147 ++++++++ storagedriver/inmemory/inmemory_test.go | 20 ++ storagedriver/ipc/client.go | 285 ++++++++++++++++ storagedriver/ipc/ipc.go | 83 +++++ storagedriver/ipc/server.go | 160 +++++++++ storagedriver/storagedriver.go | 34 ++ storagedriver/testsuites/testsuites.go | 353 ++++++++++++++++++++ 12 files changed, 1320 insertions(+) create mode 100644 .travis.yml create mode 100644 main/storagedriver/filesystem/filesystem.go create mode 100644 main/storagedriver/inmemory/inmemory.go create mode 100644 storagedriver/filesystem/filesystem.go create mode 100644 storagedriver/filesystem/filesystem_test.go create mode 100644 storagedriver/inmemory/inmemory.go create mode 100644 storagedriver/inmemory/inmemory_test.go create mode 100644 storagedriver/ipc/client.go create mode 100644 storagedriver/ipc/ipc.go create mode 100644 storagedriver/ipc/server.go create mode 100644 storagedriver/storagedriver.go create mode 100644 storagedriver/testsuites/testsuites.go diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 000000000..d48424c34 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,5 @@ +language: go + +go: +- 1.3 +- tip diff --git a/main/storagedriver/filesystem/filesystem.go b/main/storagedriver/filesystem/filesystem.go new file mode 100644 index 000000000..8c0e26770 --- /dev/null +++ b/main/storagedriver/filesystem/filesystem.go @@ -0,0 +1,26 @@ +package main + +import ( + "encoding/json" + "os" + + "github.com/docker/docker-registry/storagedriver/filesystem" + "github.com/docker/docker-registry/storagedriver/ipc" +) + +func main() { + parametersBytes := []byte(os.Args[1]) + var parameters map[string]interface{} + err := json.Unmarshal(parametersBytes, ¶meters) + if err != nil { + panic(err) + } + rootDirectory := "/tmp/registry" + if parameters != nil { + rootDirParam, ok := parameters["RootDirectory"].(string) + if ok && rootDirParam != "" { + rootDirectory = rootDirParam + } + } + ipc.Server(filesystem.NewDriver(rootDirectory)) +} diff --git a/main/storagedriver/inmemory/inmemory.go b/main/storagedriver/inmemory/inmemory.go new file mode 100644 index 000000000..f55c8d5f7 --- /dev/null +++ b/main/storagedriver/inmemory/inmemory.go @@ -0,0 +1,10 @@ +package main + +import ( + "github.com/docker/docker-registry/storagedriver/inmemory" + "github.com/docker/docker-registry/storagedriver/ipc" +) + +func main() { + ipc.Server(inmemory.NewDriver()) +} diff --git a/storagedriver/filesystem/filesystem.go b/storagedriver/filesystem/filesystem.go new file mode 100644 index 000000000..79106e378 --- /dev/null +++ b/storagedriver/filesystem/filesystem.go @@ -0,0 +1,173 @@ +package filesystem + +import ( + "io" + "io/ioutil" + "os" + "path" + "strings" + + "github.com/docker/docker-registry/storagedriver" +) + +type FilesystemDriver struct { + rootDirectory string +} + +func NewDriver(rootDirectory string) *FilesystemDriver { + return &FilesystemDriver{rootDirectory} +} + +func (d *FilesystemDriver) subPath(subPath string) string { + return path.Join(d.rootDirectory, subPath) +} + +func (d *FilesystemDriver) GetContent(path string) ([]byte, error) { + contents, err := ioutil.ReadFile(d.subPath(path)) + if err != nil { + return nil, storagedriver.PathNotFoundError{path} + } + return contents, nil +} + +func (d *FilesystemDriver) PutContent(subPath string, contents []byte) error { + fullPath := d.subPath(subPath) + parentDir := path.Dir(fullPath) + err := os.MkdirAll(parentDir, 0755) + if err != nil { + return err + } + + err = ioutil.WriteFile(fullPath, contents, 0644) + return err +} + +func (d *FilesystemDriver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { + file, err := os.OpenFile(d.subPath(path), os.O_RDONLY, 0644) + if err != nil { + return nil, err + } + + seekPos, err := file.Seek(int64(offset), os.SEEK_SET) + if err != nil { + file.Close() + return nil, err + } else if seekPos < int64(offset) { + file.Close() + return nil, storagedriver.InvalidOffsetError{path, offset} + } + + return file, nil +} + +func (d *FilesystemDriver) WriteStream(subPath string, offset, size uint64, reader io.ReadCloser) error { + defer reader.Close() + + resumableOffset, err := d.ResumeWritePosition(subPath) + if _, pathNotFound := err.(storagedriver.PathNotFoundError); err != nil && !pathNotFound { + return err + } + + if offset > resumableOffset { + return storagedriver.InvalidOffsetError{subPath, offset} + } + + fullPath := d.subPath(subPath) + parentDir := path.Dir(fullPath) + err = os.MkdirAll(parentDir, 0755) + if err != nil { + return err + } + + var file *os.File + if offset == 0 { + file, err = os.Create(fullPath) + } else { + file, err = os.OpenFile(fullPath, os.O_WRONLY|os.O_APPEND, 0) + } + + if err != nil { + return err + } + defer file.Close() + + buf := make([]byte, 32*1024) + for { + bytesRead, er := reader.Read(buf) + if bytesRead > 0 { + bytesWritten, ew := file.WriteAt(buf[0:bytesRead], int64(offset)) + if bytesWritten > 0 { + offset += uint64(bytesWritten) + } + if ew != nil { + err = ew + break + } + if bytesRead != bytesWritten { + err = io.ErrShortWrite + break + } + } + if er == io.EOF { + break + } + if er != nil { + err = er + break + } + } + return err +} + +func (d *FilesystemDriver) ResumeWritePosition(subPath string) (uint64, error) { + fullPath := d.subPath(subPath) + + fileInfo, err := os.Stat(fullPath) + if err != nil && !os.IsNotExist(err) { + return 0, err + } else if err != nil { + return 0, storagedriver.PathNotFoundError{subPath} + } + return uint64(fileInfo.Size()), nil +} + +func (d *FilesystemDriver) List(prefix string) ([]string, error) { + prefix = strings.TrimRight(prefix, "/") + fullPath := d.subPath(prefix) + + dir, err := os.Open(fullPath) + if err != nil { + return nil, err + } + + fileNames, err := dir.Readdirnames(0) + if err != nil { + return nil, err + } + + keys := make([]string, 0, len(fileNames)) + for _, fileName := range fileNames { + keys = append(keys, path.Join(prefix, fileName)) + } + + return keys, nil +} + +func (d *FilesystemDriver) Move(sourcePath string, destPath string) error { + err := os.Rename(d.subPath(sourcePath), d.subPath(destPath)) + return err +} + +func (d *FilesystemDriver) Delete(subPath string) error { + fullPath := d.subPath(subPath) + + _, err := os.Stat(fullPath) + if err != nil && !os.IsNotExist(err) { + return err + } else if err != nil { + return storagedriver.PathNotFoundError{subPath} + } + + err = os.RemoveAll(fullPath) + return err +} diff --git a/storagedriver/filesystem/filesystem_test.go b/storagedriver/filesystem/filesystem_test.go new file mode 100644 index 000000000..c445e1782 --- /dev/null +++ b/storagedriver/filesystem/filesystem_test.go @@ -0,0 +1,24 @@ +package filesystem + +import ( + "os" + "testing" + + "github.com/docker/docker-registry/storagedriver" + "github.com/docker/docker-registry/storagedriver/testsuites" + . "gopkg.in/check.v1" +) + +// Hook up gocheck into the "go test" runner. +func Test(t *testing.T) { TestingT(t) } + +func init() { + rootDirectory := "/tmp/driver" + os.RemoveAll(rootDirectory) + + filesystemDriverConstructor := func() (storagedriver.StorageDriver, error) { + return NewDriver(rootDirectory), nil + } + testsuites.RegisterInProcessSuite(filesystemDriverConstructor) + testsuites.RegisterIPCSuite("filesystem", map[string]string{"RootDirectory": rootDirectory}) +} diff --git a/storagedriver/inmemory/inmemory.go b/storagedriver/inmemory/inmemory.go new file mode 100644 index 000000000..ea44bb398 --- /dev/null +++ b/storagedriver/inmemory/inmemory.go @@ -0,0 +1,147 @@ +package inmemory + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "regexp" + "strings" + "sync" + + "github.com/docker/docker-registry/storagedriver" +) + +type InMemoryDriver struct { + storage map[string][]byte + mutex sync.RWMutex +} + +func NewDriver() *InMemoryDriver { + return &InMemoryDriver{storage: make(map[string][]byte)} +} + +func (d *InMemoryDriver) GetContent(path string) ([]byte, error) { + d.mutex.RLock() + defer d.mutex.RUnlock() + contents, ok := d.storage[path] + if !ok { + return nil, storagedriver.PathNotFoundError{path} + } + return contents, nil +} + +func (d *InMemoryDriver) PutContent(path string, contents []byte) error { + d.mutex.Lock() + defer d.mutex.Unlock() + d.storage[path] = contents + return nil +} + +func (d *InMemoryDriver) ReadStream(path string, offset uint64) (io.ReadCloser, error) { + d.mutex.RLock() + defer d.mutex.RUnlock() + contents, err := d.GetContent(path) + if err != nil { + return nil, err + } else if len(contents) < int(offset) { + return nil, storagedriver.InvalidOffsetError{path, offset} + } + + src := contents[offset:] + buf := make([]byte, len(src)) + copy(buf, src) + return ioutil.NopCloser(bytes.NewReader(buf)), nil +} + +func (d *InMemoryDriver) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error { + defer reader.Close() + d.mutex.RLock() + defer d.mutex.RUnlock() + + resumableOffset, err := d.ResumeWritePosition(path) + if err != nil { + return err + } + + if offset > resumableOffset { + return storagedriver.InvalidOffsetError{path, offset} + } + + contents, err := ioutil.ReadAll(reader) + if err != nil { + return err + } + + if offset > 0 { + contents = append(d.storage[path][0:offset], contents...) + } + + d.storage[path] = contents + return nil +} + +func (d *InMemoryDriver) ResumeWritePosition(path string) (uint64, error) { + d.mutex.RLock() + defer d.mutex.RUnlock() + contents, ok := d.storage[path] + if !ok { + return 0, nil + } + return uint64(len(contents)), nil +} + +func (d *InMemoryDriver) List(prefix string) ([]string, error) { + subPathMatcher, err := regexp.Compile(fmt.Sprintf("^%s/[^/]+", prefix)) + if err != nil { + return nil, err + } + + d.mutex.RLock() + defer d.mutex.RUnlock() + // we use map to collect uniq keys + keySet := make(map[string]struct{}) + for k := range d.storage { + if key := subPathMatcher.FindString(k); key != "" { + keySet[key] = struct{}{} + } + } + + keys := make([]string, 0, len(keySet)) + for k := range keySet { + keys = append(keys, k) + } + return keys, nil +} + +func (d *InMemoryDriver) Move(sourcePath string, destPath string) error { + d.mutex.Lock() + defer d.mutex.Unlock() + contents, ok := d.storage[sourcePath] + if !ok { + return storagedriver.PathNotFoundError{sourcePath} + } + d.storage[destPath] = contents + delete(d.storage, sourcePath) + return nil +} + +func (d *InMemoryDriver) Delete(path string) error { + d.mutex.Lock() + defer d.mutex.Unlock() + subPaths := make([]string, 0) + for k := range d.storage { + if strings.HasPrefix(k, path) { + subPaths = append(subPaths, k) + } + } + + if len(subPaths) == 0 { + return storagedriver.PathNotFoundError{path} + } + + for _, subPath := range subPaths { + delete(d.storage, subPath) + } + return nil +} diff --git a/storagedriver/inmemory/inmemory_test.go b/storagedriver/inmemory/inmemory_test.go new file mode 100644 index 000000000..fa62d30d4 --- /dev/null +++ b/storagedriver/inmemory/inmemory_test.go @@ -0,0 +1,20 @@ +package inmemory + +import ( + "testing" + + "github.com/docker/docker-registry/storagedriver" + "github.com/docker/docker-registry/storagedriver/testsuites" + . "gopkg.in/check.v1" +) + +// Hook up gocheck into the "go test" runner. +func Test(t *testing.T) { TestingT(t) } + +func init() { + inmemoryDriverConstructor := func() (storagedriver.StorageDriver, error) { + return NewDriver(), nil + } + testsuites.RegisterInProcessSuite(inmemoryDriverConstructor) + testsuites.RegisterIPCSuite("inmemory", nil) +} diff --git a/storagedriver/ipc/client.go b/storagedriver/ipc/client.go new file mode 100644 index 000000000..c4e50a4d5 --- /dev/null +++ b/storagedriver/ipc/client.go @@ -0,0 +1,285 @@ +package ipc + +import ( + "encoding/json" + "io" + "net" + "os" + "os/exec" + "path" + "syscall" + + "github.com/docker/libchan" + "github.com/docker/libchan/spdy" +) + +type StorageDriverClient struct { + subprocess *exec.Cmd + socket *os.File + transport *spdy.Transport + sender libchan.Sender +} + +func NewDriverClient(name string, parameters map[string]string) (*StorageDriverClient, error) { + paramsBytes, err := json.Marshal(parameters) + if err != nil { + return nil, err + } + + driverPath := os.ExpandEnv(path.Join("$GOPATH", "bin", name)) + if _, err := os.Stat(driverPath); os.IsNotExist(err) { + driverPath = path.Join(path.Dir(os.Args[0]), name) + } + if _, err := os.Stat(driverPath); os.IsNotExist(err) { + driverPath, err = exec.LookPath(name) + if err != nil { + return nil, err + } + } + + command := exec.Command(driverPath, string(paramsBytes)) + + return &StorageDriverClient{ + subprocess: command, + }, nil +} + +func (driver *StorageDriverClient) Start() error { + fileDescriptors, err := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM, 0) + if err != nil { + return err + } + + childSocket := os.NewFile(uintptr(fileDescriptors[0]), "childSocket") + parentSocket := os.NewFile(uintptr(fileDescriptors[1]), "parentSocket") + + driver.subprocess.Stdout = os.Stdout + driver.subprocess.Stderr = os.Stderr + driver.subprocess.ExtraFiles = []*os.File{childSocket} + + if err = driver.subprocess.Start(); err != nil { + parentSocket.Close() + return err + } + + if err = childSocket.Close(); err != nil { + parentSocket.Close() + return err + } + + connection, err := net.FileConn(parentSocket) + if err != nil { + parentSocket.Close() + return err + } + transport, err := spdy.NewClientTransport(connection) + if err != nil { + parentSocket.Close() + return err + } + sender, err := transport.NewSendChannel() + if err != nil { + transport.Close() + parentSocket.Close() + return err + } + + driver.socket = parentSocket + driver.transport = transport + driver.sender = sender + + return nil +} + +func (driver *StorageDriverClient) Stop() error { + closeSenderErr := driver.sender.Close() + closeTransportErr := driver.transport.Close() + closeSocketErr := driver.socket.Close() + killErr := driver.subprocess.Process.Kill() + + if closeSenderErr != nil { + return closeSenderErr + } else if closeTransportErr != nil { + return closeTransportErr + } else if closeSocketErr != nil { + return closeSocketErr + } + return killErr +} + +func (driver *StorageDriverClient) GetContent(path string) ([]byte, error) { + receiver, remoteSender := libchan.Pipe() + + params := map[string]interface{}{"Path": path} + err := driver.sender.Send(&Request{Type: "GetContent", Parameters: params, ResponseChannel: remoteSender}) + if err != nil { + return nil, err + } + + var response GetContentResponse + err = receiver.Receive(&response) + if err != nil { + return nil, err + } + + if response.Error != nil { + return nil, response.Error + } + + return response.Content, nil +} + +func (driver *StorageDriverClient) PutContent(path string, contents []byte) error { + receiver, remoteSender := libchan.Pipe() + + params := map[string]interface{}{"Path": path, "Contents": contents} + err := driver.sender.Send(&Request{Type: "PutContent", Parameters: params, ResponseChannel: remoteSender}) + if err != nil { + return err + } + + var response PutContentResponse + err = receiver.Receive(&response) + if err != nil { + panic(err) + return err + } + + if response.Error != nil { + return response.Error + } + + return nil +} + +func (driver *StorageDriverClient) ReadStream(path string, offset uint64) (io.ReadCloser, error) { + receiver, remoteSender := libchan.Pipe() + + params := map[string]interface{}{"Path": path, "Offset": offset} + err := driver.sender.Send(&Request{Type: "ReadStream", Parameters: params, ResponseChannel: remoteSender}) + if err != nil { + return nil, err + } + + var response ReadStreamResponse + err = receiver.Receive(&response) + if err != nil { + return nil, err + } + + if response.Error != nil { + return nil, response.Error + } + + return response.Reader, nil +} + +func (driver *StorageDriverClient) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error { + receiver, remoteSender := libchan.Pipe() + + params := map[string]interface{}{"Path": path, "Offset": offset, "Size": size, "Reader": WrapReadCloser(reader)} + err := driver.sender.Send(&Request{Type: "WriteStream", Parameters: params, ResponseChannel: remoteSender}) + if err != nil { + return err + } + + var response WriteStreamResponse + err = receiver.Receive(&response) + if err != nil { + return err + } + + if response.Error != nil { + return response.Error + } + + return nil +} + +func (driver *StorageDriverClient) ResumeWritePosition(path string) (uint64, error) { + receiver, remoteSender := libchan.Pipe() + + params := map[string]interface{}{"Path": path} + err := driver.sender.Send(&Request{Type: "ResumeWritePosition", Parameters: params, ResponseChannel: remoteSender}) + if err != nil { + return 0, err + } + + var response ResumeWritePositionResponse + err = receiver.Receive(&response) + if err != nil { + return 0, err + } + + if response.Error != nil { + return 0, response.Error + } + + return response.Position, nil +} + +func (driver *StorageDriverClient) List(prefix string) ([]string, error) { + receiver, remoteSender := libchan.Pipe() + + params := map[string]interface{}{"Prefix": prefix} + err := driver.sender.Send(&Request{Type: "List", Parameters: params, ResponseChannel: remoteSender}) + if err != nil { + return nil, err + } + + var response ListResponse + err = receiver.Receive(&response) + if err != nil { + return nil, err + } + + if response.Error != nil { + return nil, response.Error + } + + return response.Keys, nil +} + +func (driver *StorageDriverClient) Move(sourcePath string, destPath string) error { + receiver, remoteSender := libchan.Pipe() + + params := map[string]interface{}{"SourcePath": sourcePath, "DestPath": destPath} + err := driver.sender.Send(&Request{Type: "Move", Parameters: params, ResponseChannel: remoteSender}) + if err != nil { + return err + } + + var response MoveResponse + err = receiver.Receive(&response) + if err != nil { + return err + } + + if response.Error != nil { + return response.Error + } + + return nil +} + +func (driver *StorageDriverClient) Delete(path string) error { + receiver, remoteSender := libchan.Pipe() + + params := map[string]interface{}{"Path": path} + err := driver.sender.Send(&Request{Type: "Delete", Parameters: params, ResponseChannel: remoteSender}) + if err != nil { + return err + } + + var response DeleteResponse + err = receiver.Receive(&response) + if err != nil { + return err + } + + if response.Error != nil { + return response.Error + } + + return nil +} diff --git a/storagedriver/ipc/ipc.go b/storagedriver/ipc/ipc.go new file mode 100644 index 000000000..ab960b82f --- /dev/null +++ b/storagedriver/ipc/ipc.go @@ -0,0 +1,83 @@ +package ipc + +import ( + "errors" + "fmt" + "io" + "reflect" + + "github.com/docker/libchan" +) + +type Request struct { + Type string + Parameters map[string]interface{} + ResponseChannel libchan.Sender +} + +type noWriteReadWriteCloser struct { + io.ReadCloser +} + +func (r noWriteReadWriteCloser) Write(p []byte) (n int, err error) { + return 0, errors.New("Write unsupported") +} + +func WrapReadCloser(readCloser io.ReadCloser) io.ReadWriteCloser { + return noWriteReadWriteCloser{readCloser} +} + +type responseError struct { + Type string + Message string +} + +func ResponseError(err error) *responseError { + if err == nil { + return nil + } + return &responseError{ + Type: reflect.TypeOf(err).String(), + Message: err.Error(), + } +} + +func (err *responseError) Error() string { + return fmt.Sprintf("%s: %s", err.Type, err.Message) +} + +type GetContentResponse struct { + Content []byte + Error *responseError +} + +type PutContentResponse struct { + Error *responseError +} + +type ReadStreamResponse struct { + Reader io.ReadWriteCloser + Error *responseError +} + +type WriteStreamResponse struct { + Error *responseError +} + +type ResumeWritePositionResponse struct { + Position uint64 + Error *responseError +} + +type ListResponse struct { + Keys []string + Error *responseError +} + +type MoveResponse struct { + Error *responseError +} + +type DeleteResponse struct { + Error *responseError +} diff --git a/storagedriver/ipc/server.go b/storagedriver/ipc/server.go new file mode 100644 index 000000000..2e240f428 --- /dev/null +++ b/storagedriver/ipc/server.go @@ -0,0 +1,160 @@ +package ipc + +import ( + "io" + "net" + "os" + + "github.com/docker/docker-registry/storagedriver" + "github.com/docker/libchan" + "github.com/docker/libchan/spdy" +) + +func Server(driver storagedriver.StorageDriver) error { + childSocket := os.NewFile(3, "childSocket") + defer childSocket.Close() + conn, err := net.FileConn(childSocket) + if err != nil { + panic(err) + } + defer conn.Close() + if transport, err := spdy.NewServerTransport(conn); err != nil { + panic(err) + } else { + for { + receiver, err := transport.WaitReceiveChannel() + if err != nil { + panic(err) + } + go receive(driver, receiver) + } + return nil + } +} + +func receive(driver storagedriver.StorageDriver, receiver libchan.Receiver) { + for { + var request Request + err := receiver.Receive(&request) + if err != nil { + panic(err) + } + go handleRequest(driver, request) + } +} + +func handleRequest(driver storagedriver.StorageDriver, request Request) { + + switch request.Type { + case "GetContent": + path, _ := request.Parameters["Path"].(string) + content, err := driver.GetContent(path) + response := GetContentResponse{ + Content: content, + Error: ResponseError(err), + } + err = request.ResponseChannel.Send(&response) + if err != nil { + panic(err) + } + case "PutContent": + path, _ := request.Parameters["Path"].(string) + contents, _ := request.Parameters["Contents"].([]byte) + err := driver.PutContent(path, contents) + response := PutContentResponse{ + Error: ResponseError(err), + } + err = request.ResponseChannel.Send(&response) + if err != nil { + panic(err) + } + case "ReadStream": + var offset uint64 + + path, _ := request.Parameters["Path"].(string) + offset, ok := request.Parameters["Offset"].(uint64) + if !ok { + offsetSigned, _ := request.Parameters["Offset"].(int64) + offset = uint64(offsetSigned) + } + reader, err := driver.ReadStream(path, offset) + var response ReadStreamResponse + if err != nil { + response = ReadStreamResponse{Error: ResponseError(err)} + } else { + response = ReadStreamResponse{Reader: WrapReadCloser(reader)} + } + err = request.ResponseChannel.Send(&response) + if err != nil { + panic(err) + } + case "WriteStream": + var offset uint64 + + path, _ := request.Parameters["Path"].(string) + offset, ok := request.Parameters["Offset"].(uint64) + if !ok { + offsetSigned, _ := request.Parameters["Offset"].(int64) + offset = uint64(offsetSigned) + } + size, ok := request.Parameters["Size"].(uint64) + if !ok { + sizeSigned, _ := request.Parameters["Size"].(int64) + size = uint64(sizeSigned) + } + reader, _ := request.Parameters["Reader"].(io.ReadCloser) + err := driver.WriteStream(path, offset, size, reader) + response := WriteStreamResponse{ + Error: ResponseError(err), + } + err = request.ResponseChannel.Send(&response) + if err != nil { + panic(err) + } + case "ResumeWritePosition": + path, _ := request.Parameters["Path"].(string) + position, err := driver.ResumeWritePosition(path) + response := ResumeWritePositionResponse{ + Position: position, + Error: ResponseError(err), + } + err = request.ResponseChannel.Send(&response) + if err != nil { + panic(err) + } + case "List": + prefix, _ := request.Parameters["Prefix"].(string) + keys, err := driver.List(prefix) + response := ListResponse{ + Keys: keys, + Error: ResponseError(err), + } + err = request.ResponseChannel.Send(&response) + if err != nil { + panic(err) + } + case "Move": + sourcePath, _ := request.Parameters["SourcePath"].(string) + destPath, _ := request.Parameters["DestPath"].(string) + err := driver.Move(sourcePath, destPath) + response := MoveResponse{ + Error: ResponseError(err), + } + err = request.ResponseChannel.Send(&response) + if err != nil { + panic(err) + } + case "Delete": + path, _ := request.Parameters["Path"].(string) + err := driver.Delete(path) + response := DeleteResponse{ + Error: ResponseError(err), + } + err = request.ResponseChannel.Send(&response) + if err != nil { + panic(err) + } + default: + panic(request) + } +} diff --git a/storagedriver/storagedriver.go b/storagedriver/storagedriver.go new file mode 100644 index 000000000..bfbfc110a --- /dev/null +++ b/storagedriver/storagedriver.go @@ -0,0 +1,34 @@ +package storagedriver + +import ( + "fmt" + "io" +) + +type StorageDriver interface { + GetContent(path string) ([]byte, error) + PutContent(path string, content []byte) error + ReadStream(path string, offset uint64) (io.ReadCloser, error) + WriteStream(path string, offset, size uint64, readCloser io.ReadCloser) error + ResumeWritePosition(path string) (uint64, error) + List(prefix string) ([]string, error) + Move(sourcePath string, destPath string) error + Delete(path string) error +} + +type PathNotFoundError struct { + Path string +} + +func (err PathNotFoundError) Error() string { + return fmt.Sprintf("Path not found: %s", err.Path) +} + +type InvalidOffsetError struct { + Path string + Offset uint64 +} + +func (err InvalidOffsetError) Error() string { + return fmt.Sprintf("Invalid offset: %d for path: %s", err.Offset, err.Path) +} diff --git a/storagedriver/testsuites/testsuites.go b/storagedriver/testsuites/testsuites.go new file mode 100644 index 000000000..7ca196d6e --- /dev/null +++ b/storagedriver/testsuites/testsuites.go @@ -0,0 +1,353 @@ +package testsuites + +import ( + "bytes" + "io/ioutil" + "math/rand" + "path" + "sort" + "testing" + + "github.com/docker/docker-registry/storagedriver" + "github.com/docker/docker-registry/storagedriver/ipc" + + . "gopkg.in/check.v1" +) + +// Hook up gocheck into the "go test" runner +func Test(t *testing.T) { TestingT(t) } + +func RegisterInProcessSuite(driverConstructor DriverConstructor) { + Suite(&DriverSuite{ + Constructor: driverConstructor, + }) +} + +func RegisterIPCSuite(driverName string, ipcParams map[string]string) { + suite := &DriverSuite{ + Constructor: func() (storagedriver.StorageDriver, error) { + d, err := ipc.NewDriverClient(driverName, ipcParams) + if err != nil { + return nil, err + } + err = d.Start() + if err != nil { + return nil, err + } + return d, nil + }, + } + suite.Teardown = func() error { + driverClient := suite.StorageDriver.(*ipc.StorageDriverClient) + return driverClient.Stop() + } + Suite(suite) +} + +type DriverConstructor func() (storagedriver.StorageDriver, error) +type DriverTeardown func() error + +type DriverSuite struct { + Constructor DriverConstructor + Teardown DriverTeardown + storagedriver.StorageDriver +} + +type TestDriverConfig struct { + name string + params map[string]string +} + +func (suite *DriverSuite) SetUpSuite(c *C) { + d, err := suite.Constructor() + c.Assert(err, IsNil) + suite.StorageDriver = d +} + +func (suite *DriverSuite) TearDownSuite(c *C) { + if suite.Teardown != nil { + err := suite.Teardown() + c.Assert(err, IsNil) + } +} + +func (suite *DriverSuite) TestWriteRead1(c *C) { + filename := randomString(32) + contents := []byte("a") + suite.writeReadCompare(c, filename, contents, contents) +} + +func (suite *DriverSuite) TestWriteRead2(c *C) { + filename := randomString(32) + contents := []byte("\xc3\x9f") + suite.writeReadCompare(c, filename, contents, contents) +} + +func (suite *DriverSuite) TestWriteRead3(c *C) { + filename := randomString(32) + contents := []byte(randomString(32)) + suite.writeReadCompare(c, filename, contents, contents) +} + +func (suite *DriverSuite) TestWriteRead4(c *C) { + filename := randomString(32) + contents := []byte(randomString(1024 * 1024)) + suite.writeReadCompare(c, filename, contents, contents) +} + +func (suite *DriverSuite) TestReadNonexistent(c *C) { + filename := randomString(32) + _, err := suite.StorageDriver.GetContent(filename) + c.Assert(err, NotNil) +} + +func (suite *DriverSuite) TestWriteReadStreams1(c *C) { + filename := randomString(32) + contents := []byte("a") + suite.writeReadCompareStreams(c, filename, contents, contents) +} + +func (suite *DriverSuite) TestWriteReadStreams2(c *C) { + filename := randomString(32) + contents := []byte("\xc3\x9f") + suite.writeReadCompareStreams(c, filename, contents, contents) +} + +func (suite *DriverSuite) TestWriteReadStreams3(c *C) { + filename := randomString(32) + contents := []byte(randomString(32)) + suite.writeReadCompareStreams(c, filename, contents, contents) +} + +func (suite *DriverSuite) TestWriteReadStreams4(c *C) { + filename := randomString(32) + contents := []byte(randomString(1024 * 1024)) + suite.writeReadCompareStreams(c, filename, contents, contents) +} + +func (suite *DriverSuite) TestContinueStreamAppend(c *C) { + filename := randomString(32) + + chunkSize := uint64(32) + + contentsChunk1 := []byte(randomString(chunkSize)) + contentsChunk2 := []byte(randomString(chunkSize)) + contentsChunk3 := []byte(randomString(chunkSize)) + + fullContents := append(append(contentsChunk1, contentsChunk2...), contentsChunk3...) + + err := suite.StorageDriver.WriteStream(filename, 0, 3*chunkSize, ioutil.NopCloser(bytes.NewReader(contentsChunk1))) + c.Assert(err, IsNil) + + offset, err := suite.StorageDriver.ResumeWritePosition(filename) + c.Assert(err, IsNil) + if offset > chunkSize { + c.Fatalf("Offset too large, %d > %d", offset, chunkSize) + } + err = suite.StorageDriver.WriteStream(filename, offset, 3*chunkSize, ioutil.NopCloser(bytes.NewReader(fullContents[offset:2*chunkSize]))) + c.Assert(err, IsNil) + + offset, err = suite.StorageDriver.ResumeWritePosition(filename) + c.Assert(err, IsNil) + if offset > 2*chunkSize { + c.Fatalf("Offset too large, %d > %d", offset, 2*chunkSize) + } + + err = suite.StorageDriver.WriteStream(filename, offset, 3*chunkSize, ioutil.NopCloser(bytes.NewReader(fullContents[offset:]))) + c.Assert(err, IsNil) + + received, err := suite.StorageDriver.GetContent(filename) + c.Assert(err, IsNil) + c.Assert(received, DeepEquals, fullContents) + + offset, err = suite.StorageDriver.ResumeWritePosition(filename) + c.Assert(err, IsNil) + c.Assert(offset, Equals, uint64(3*chunkSize)) +} + +func (suite *DriverSuite) TestReadStreamWithOffset(c *C) { + filename := randomString(32) + + chunkSize := uint64(32) + + contentsChunk1 := []byte(randomString(chunkSize)) + contentsChunk2 := []byte(randomString(chunkSize)) + contentsChunk3 := []byte(randomString(chunkSize)) + + err := suite.StorageDriver.PutContent(filename, append(append(contentsChunk1, contentsChunk2...), contentsChunk3...)) + c.Assert(err, IsNil) + + reader, err := suite.StorageDriver.ReadStream(filename, 0) + c.Assert(err, IsNil) + defer reader.Close() + + readContents, err := ioutil.ReadAll(reader) + c.Assert(err, IsNil) + + c.Assert(readContents, DeepEquals, append(append(contentsChunk1, contentsChunk2...), contentsChunk3...)) + + reader, err = suite.StorageDriver.ReadStream(filename, chunkSize) + c.Assert(err, IsNil) + defer reader.Close() + + readContents, err = ioutil.ReadAll(reader) + c.Assert(err, IsNil) + + c.Assert(readContents, DeepEquals, append(contentsChunk2, contentsChunk3...)) + + reader, err = suite.StorageDriver.ReadStream(filename, chunkSize*2) + c.Assert(err, IsNil) + defer reader.Close() + + readContents, err = ioutil.ReadAll(reader) + c.Assert(err, IsNil) + + c.Assert(readContents, DeepEquals, contentsChunk3) + + reader, err = suite.StorageDriver.ReadStream(filename, chunkSize*3) + c.Assert(err, IsNil) + defer reader.Close() + + readContents, err = ioutil.ReadAll(reader) + c.Assert(err, IsNil) + + c.Assert(readContents, DeepEquals, []byte{}) +} + +func (suite *DriverSuite) TestReadNonexistentStream(c *C) { + filename := randomString(32) + _, err := suite.StorageDriver.ReadStream(filename, 0) + c.Assert(err, NotNil) +} + +func (suite *DriverSuite) TestList(c *C) { + rootDirectory := randomString(uint64(8 + rand.Intn(8))) + parentDirectory := rootDirectory + "/" + randomString(uint64(8+rand.Intn(8))) + childFiles := make([]string, 50) + for i := 0; i < len(childFiles); i++ { + childFile := parentDirectory + "/" + randomString(uint64(8+rand.Intn(8))) + childFiles[i] = childFile + err := suite.StorageDriver.PutContent(childFile, []byte(randomString(32))) + c.Assert(err, IsNil) + } + sort.Strings(childFiles) + + keys, err := suite.StorageDriver.List(rootDirectory) + c.Assert(err, IsNil) + c.Assert(keys, DeepEquals, []string{parentDirectory}) + + keys, err = suite.StorageDriver.List(parentDirectory) + c.Assert(err, IsNil) + + sort.Strings(keys) + c.Assert(keys, DeepEquals, childFiles) +} + +func (suite *DriverSuite) TestMove(c *C) { + contents := []byte(randomString(32)) + sourcePath := randomString(32) + destPath := randomString(32) + + err := suite.StorageDriver.PutContent(sourcePath, contents) + c.Assert(err, IsNil) + + err = suite.StorageDriver.Move(sourcePath, destPath) + c.Assert(err, IsNil) + + received, err := suite.StorageDriver.GetContent(destPath) + c.Assert(err, IsNil) + c.Assert(received, DeepEquals, contents) + + _, err = suite.StorageDriver.GetContent(sourcePath) + c.Assert(err, NotNil) +} + +func (suite *DriverSuite) TestMoveNonexistent(c *C) { + sourcePath := randomString(32) + destPath := randomString(32) + + err := suite.StorageDriver.Move(sourcePath, destPath) + c.Assert(err, NotNil) +} + +func (suite *DriverSuite) TestRemove(c *C) { + filename := randomString(32) + contents := []byte(randomString(32)) + + err := suite.StorageDriver.PutContent(filename, contents) + c.Assert(err, IsNil) + + err = suite.StorageDriver.Delete(filename) + c.Assert(err, IsNil) + + _, err = suite.StorageDriver.GetContent(filename) + c.Assert(err, NotNil) +} + +func (suite *DriverSuite) TestRemoveNonexistent(c *C) { + filename := randomString(32) + err := suite.StorageDriver.Delete(filename) + c.Assert(err, NotNil) +} + +func (suite *DriverSuite) TestRemoveFolder(c *C) { + dirname := randomString(32) + filename1 := randomString(32) + filename2 := randomString(32) + contents := []byte(randomString(32)) + + err := suite.StorageDriver.PutContent(path.Join(dirname, filename1), contents) + c.Assert(err, IsNil) + + err = suite.StorageDriver.PutContent(path.Join(dirname, filename2), contents) + c.Assert(err, IsNil) + + err = suite.StorageDriver.Delete(dirname) + c.Assert(err, IsNil) + + _, err = suite.StorageDriver.GetContent(path.Join(dirname, filename1)) + c.Assert(err, NotNil) + + _, err = suite.StorageDriver.GetContent(path.Join(dirname, filename2)) + c.Assert(err, NotNil) +} + +func (suite *DriverSuite) writeReadCompare(c *C, filename string, contents, expected []byte) { + err := suite.StorageDriver.PutContent(filename, contents) + c.Assert(err, IsNil) + + readContents, err := suite.StorageDriver.GetContent(filename) + c.Assert(err, IsNil) + + c.Assert(readContents, DeepEquals, contents) + + err = suite.StorageDriver.Delete(filename) + c.Assert(err, IsNil) +} + +func (suite *DriverSuite) writeReadCompareStreams(c *C, filename string, contents, expected []byte) { + err := suite.StorageDriver.WriteStream(filename, 0, uint64(len(contents)), ioutil.NopCloser(bytes.NewReader(contents))) + c.Assert(err, IsNil) + + reader, err := suite.StorageDriver.ReadStream(filename, 0) + c.Assert(err, IsNil) + defer reader.Close() + + readContents, err := ioutil.ReadAll(reader) + c.Assert(err, IsNil) + + c.Assert(readContents, DeepEquals, contents) + + err = suite.StorageDriver.Delete(filename) + c.Assert(err, IsNil) +} + +var pathChars = []byte("abcdefghijklmnopqrstuvwxyz") + +func randomString(length uint64) string { + b := make([]byte, length) + for i := range b { + b[i] = pathChars[rand.Intn(len(pathChars))] + } + return string(b) +}