forked from TrueCloudLab/distribution
Merge pull request #643 from BrianBland/next-generation
Adds storage driver interface, tests, and three basic implementations
This commit is contained in:
commit
df7eed3a2c
17 changed files with 1933 additions and 0 deletions
5
.travis.yml
Normal file
5
.travis.yml
Normal file
|
@ -0,0 +1,5 @@
|
||||||
|
language: go
|
||||||
|
|
||||||
|
go:
|
||||||
|
- 1.3
|
||||||
|
- tip
|
21
main/storagedriver/filesystem/filesystem.go
Normal file
21
main/storagedriver/filesystem/filesystem.go
Normal file
|
@ -0,0 +1,21 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/docker/docker-registry/storagedriver/filesystem"
|
||||||
|
"github.com/docker/docker-registry/storagedriver/ipc"
|
||||||
|
)
|
||||||
|
|
||||||
|
// An out-of-process filesystem driver, intended to be run by ipc.NewDriverClient
|
||||||
|
func main() {
|
||||||
|
parametersBytes := []byte(os.Args[1])
|
||||||
|
var parameters map[string]string
|
||||||
|
err := json.Unmarshal(parametersBytes, ¶meters)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ipc.StorageDriverServer(filesystem.FromParameters(parameters))
|
||||||
|
}
|
12
main/storagedriver/inmemory/inmemory.go
Normal file
12
main/storagedriver/inmemory/inmemory.go
Normal file
|
@ -0,0 +1,12 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/docker/docker-registry/storagedriver/inmemory"
|
||||||
|
"github.com/docker/docker-registry/storagedriver/ipc"
|
||||||
|
)
|
||||||
|
|
||||||
|
// An out-of-process inmemory driver, intended to be run by ipc.NewDriverClient
|
||||||
|
// This exists primarily for example and testing purposes
|
||||||
|
func main() {
|
||||||
|
ipc.StorageDriverServer(inmemory.New())
|
||||||
|
}
|
26
main/storagedriver/s3/s3.go
Normal file
26
main/storagedriver/s3/s3.go
Normal file
|
@ -0,0 +1,26 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/docker/docker-registry/storagedriver/ipc"
|
||||||
|
"github.com/docker/docker-registry/storagedriver/s3"
|
||||||
|
)
|
||||||
|
|
||||||
|
// An out-of-process S3 driver, intended to be run by ipc.NewDriverClient
|
||||||
|
func main() {
|
||||||
|
parametersBytes := []byte(os.Args[1])
|
||||||
|
var parameters map[string]string
|
||||||
|
err := json.Unmarshal(parametersBytes, ¶meters)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
driver, err := s3.FromParameters(parameters)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ipc.StorageDriverServer(driver)
|
||||||
|
}
|
47
storagedriver/README.md
Normal file
47
storagedriver/README.md
Normal file
|
@ -0,0 +1,47 @@
|
||||||
|
Docker-Registry Storage Driver
|
||||||
|
==============================
|
||||||
|
|
||||||
|
This document describes the registry storage driver model, implementation, and explains how to contribute new storage drivers.
|
||||||
|
|
||||||
|
Provided Drivers
|
||||||
|
================
|
||||||
|
|
||||||
|
This storage driver package comes bundled with three default drivers.
|
||||||
|
|
||||||
|
1. filesystem: A local storage driver configured to use a directory tree in the local filesystem.
|
||||||
|
2. s3: A driver storing objects in an Amazon Simple Storage Solution (S3) bucket.
|
||||||
|
3. inmemory: A temporary storage driver using a local inmemory map. This exists solely for reference and testing.
|
||||||
|
|
||||||
|
Storage Driver API
|
||||||
|
==================
|
||||||
|
|
||||||
|
The storage driver API is designed to model a filesystem-like key/value storage in a manner abstract enough to support a range of drivers from the local filesystem to Amazon S3 or other distributed object storage systems.
|
||||||
|
|
||||||
|
Storage drivers are required to implement the `storagedriver.StorageDriver` interface provided in `storagedriver.go`, which includes methods for reading, writing, and deleting content, as well as listing child objects of a specified prefix key.
|
||||||
|
|
||||||
|
Storage drivers are intended (but not required) to be written in go, providing compile-time validation of the `storagedriver.StorageDriver` interface, although an IPC driver wrapper means that it is not required for drivers to be included in the compiled registry. The `storagedriver/ipc` package provides a client/server protocol for running storage drivers provided in external executables as a managed child server process.
|
||||||
|
|
||||||
|
Driver Selection and Configuration
|
||||||
|
==================================
|
||||||
|
|
||||||
|
The preferred method of selecting a storage driver is using the `StorageDriverFactory` interface in the `storagedriver/factory` package. These factories provide a common interface for constructing storage drivers with a parameters map. The factory model is based off of the [Register](http://golang.org/pkg/database/sql/#Register) and [Open](http://golang.org/pkg/database/sql/#Open) methods in the builtin [database/sql](http://golang.org/pkg/database/sql) package.
|
||||||
|
|
||||||
|
Storage driver factories may be registered by name using the `factory.Register` method, and then later invoked by calling `factory.Create` with a driver name and parameters map. If no driver is registered with the given name, this factory will attempt to find an executable storage driver with the same name and return an IPC storage driver wrapper managing the driver subprocess. If no such storage driver can be found, `factory.Create` will return an `InvalidStorageDriverError`.
|
||||||
|
|
||||||
|
Driver Contribution
|
||||||
|
===================
|
||||||
|
|
||||||
|
## Writing new storage drivers
|
||||||
|
To create a valid storage driver, one must implement the `storagedriver.StorageDriver` interface and make sure to expose this driver via the factory system and as a distributable IPC server executable.
|
||||||
|
|
||||||
|
### In-process drivers
|
||||||
|
Storage drivers should call `factory.Register` with their driver name in an `init` method, allowing callers of `factory.New` to construct instances of this driver without requiring modification of imports throughout the codebase.
|
||||||
|
|
||||||
|
### Out-of-process drivers
|
||||||
|
As many users will run the registry as a pre-constructed docker container, storage drivers should also be distributable as IPC server executables. Drivers written in go should model the main method provided in `main/storagedriver/filesystem/filesystem.go`. Parameters to IPC drivers will be provided as a JSON-serialized map in the first argument to the process. These parameters should be validated and then a blocking call to `ipc.StorageDriverServer` should be made with a new storage driver.
|
||||||
|
|
||||||
|
## Testing
|
||||||
|
Storage driver test suites are provided in `storagedriver/testsuites/testsuites.go` and may be used for any storage driver written in go. Two methods are provided for registering test suites, `RegisterInProcessSuite` and `RegisterIPCSuite`, which run the same set of tests for the driver imported or managed over IPC respectively.
|
||||||
|
|
||||||
|
## Drivers written in other languages
|
||||||
|
Although storage drivers are strongly recommended to be written in go for consistency, compile-time validation, and support, the IPC framework allows for a level of language-agnosticism. Non-go drivers must implement the storage driver protocol by mimicing StorageDriverServer in `storagedriver/ipc/server.go`. As the IPC framework is a layer on top of [docker/libchan](https://github.com/docker/libchan), this currently limits language support to Java via [ndeloof/chan](https://github.com/ndeloof/jchan) and Javascript via [GraftJS/jschan](https://github.com/GraftJS/jschan), although contributions to the libchan project are welcome.
|
65
storagedriver/factory/factory.go
Normal file
65
storagedriver/factory/factory.go
Normal file
|
@ -0,0 +1,65 @@
|
||||||
|
package factory
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/docker/docker-registry/storagedriver"
|
||||||
|
"github.com/docker/docker-registry/storagedriver/ipc"
|
||||||
|
)
|
||||||
|
|
||||||
|
// driverFactories stores an internal mapping between storage driver names and their respective
|
||||||
|
// factories
|
||||||
|
var driverFactories = make(map[string]StorageDriverFactory)
|
||||||
|
|
||||||
|
// StorageDriverFactory is a factory interface for creating storagedriver.StorageDriver interfaces
|
||||||
|
// Storage drivers should call Register() with a factory to make the driver available by name
|
||||||
|
type StorageDriverFactory interface {
|
||||||
|
// Create returns a new storagedriver.StorageDriver with the given parameters
|
||||||
|
// Parameters will vary by driver and may be ignored
|
||||||
|
// Each parameter key must only consist of lowercase letters and numbers
|
||||||
|
Create(parameters map[string]string) (storagedriver.StorageDriver, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register makes a storage driver available by the provided name.
|
||||||
|
// If Register is called twice with the same name or if driver factory is nil, it panics.
|
||||||
|
func Register(name string, factory StorageDriverFactory) {
|
||||||
|
if factory == nil {
|
||||||
|
panic("Must not provide nil StorageDriverFactory")
|
||||||
|
}
|
||||||
|
_, registered := driverFactories[name]
|
||||||
|
if registered {
|
||||||
|
panic(fmt.Sprintf("StorageDriverFactory named %s already registered", name))
|
||||||
|
}
|
||||||
|
|
||||||
|
driverFactories[name] = factory
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a new storagedriver.StorageDriver with the given name and parameters
|
||||||
|
// To run in-process, the StorageDriverFactory must first be registered with the given name
|
||||||
|
// If no in-process drivers are found with the given name, this attempts to create an IPC driver
|
||||||
|
// If no in-process or external drivers are found, an InvalidStorageDriverError is returned
|
||||||
|
func Create(name string, parameters map[string]string) (storagedriver.StorageDriver, error) {
|
||||||
|
driverFactory, ok := driverFactories[name]
|
||||||
|
if !ok {
|
||||||
|
// No registered StorageDriverFactory found, try ipc
|
||||||
|
driverClient, err := ipc.NewDriverClient(name, parameters)
|
||||||
|
if err != nil {
|
||||||
|
return nil, InvalidStorageDriverError{name}
|
||||||
|
}
|
||||||
|
err = driverClient.Start()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return driverClient, nil
|
||||||
|
}
|
||||||
|
return driverFactory.Create(parameters)
|
||||||
|
}
|
||||||
|
|
||||||
|
// InvalidStorageDriverError records an attempt to construct an unregistered storage driver
|
||||||
|
type InvalidStorageDriverError struct {
|
||||||
|
Name string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (err InvalidStorageDriverError) Error() string {
|
||||||
|
return fmt.Sprintf("StorageDriver not registered: %s", err.Name)
|
||||||
|
}
|
208
storagedriver/filesystem/filesystem.go
Normal file
208
storagedriver/filesystem/filesystem.go
Normal file
|
@ -0,0 +1,208 @@
|
||||||
|
package filesystem
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/docker/docker-registry/storagedriver"
|
||||||
|
"github.com/docker/docker-registry/storagedriver/factory"
|
||||||
|
)
|
||||||
|
|
||||||
|
const DriverName = "filesystem"
|
||||||
|
const DefaultRootDirectory = "/tmp/registry/storage"
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
factory.Register(DriverName, &filesystemDriverFactory{})
|
||||||
|
}
|
||||||
|
|
||||||
|
// filesystemDriverFactory implements the factory.StorageDriverFactory interface
|
||||||
|
type filesystemDriverFactory struct{}
|
||||||
|
|
||||||
|
func (factory *filesystemDriverFactory) Create(parameters map[string]string) (storagedriver.StorageDriver, error) {
|
||||||
|
return FromParameters(parameters), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FilesystemDriver is a storagedriver.StorageDriver implementation backed by a local filesystem
|
||||||
|
// All provided paths will be subpaths of the RootDirectory
|
||||||
|
type FilesystemDriver struct {
|
||||||
|
rootDirectory string
|
||||||
|
}
|
||||||
|
|
||||||
|
// FromParameters constructs a new FilesystemDriver with a given parameters map
|
||||||
|
// Optional Parameters:
|
||||||
|
// - rootdirectory
|
||||||
|
func FromParameters(parameters map[string]string) *FilesystemDriver {
|
||||||
|
var rootDirectory = DefaultRootDirectory
|
||||||
|
if parameters != nil {
|
||||||
|
rootDir, ok := parameters["rootdirectory"]
|
||||||
|
if ok {
|
||||||
|
rootDirectory = rootDir
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return New(rootDirectory)
|
||||||
|
}
|
||||||
|
|
||||||
|
// New constructs a new FilesystemDriver with a given rootDirectory
|
||||||
|
func New(rootDirectory string) *FilesystemDriver {
|
||||||
|
return &FilesystemDriver{rootDirectory}
|
||||||
|
}
|
||||||
|
|
||||||
|
// subPath returns the absolute path of a key within the FilesystemDriver's storage
|
||||||
|
func (d *FilesystemDriver) subPath(subPath string) string {
|
||||||
|
return path.Join(d.rootDirectory, subPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Implement the storagedriver.StorageDriver interface
|
||||||
|
|
||||||
|
func (d *FilesystemDriver) GetContent(path string) ([]byte, error) {
|
||||||
|
contents, err := ioutil.ReadFile(d.subPath(path))
|
||||||
|
if err != nil {
|
||||||
|
return nil, storagedriver.PathNotFoundError{path}
|
||||||
|
}
|
||||||
|
return contents, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *FilesystemDriver) PutContent(subPath string, contents []byte) error {
|
||||||
|
fullPath := d.subPath(subPath)
|
||||||
|
parentDir := path.Dir(fullPath)
|
||||||
|
err := os.MkdirAll(parentDir, 0755)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = ioutil.WriteFile(fullPath, contents, 0644)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *FilesystemDriver) ReadStream(path string, offset uint64) (io.ReadCloser, error) {
|
||||||
|
file, err := os.OpenFile(d.subPath(path), os.O_RDONLY, 0644)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
seekPos, err := file.Seek(int64(offset), os.SEEK_SET)
|
||||||
|
if err != nil {
|
||||||
|
file.Close()
|
||||||
|
return nil, err
|
||||||
|
} else if seekPos < int64(offset) {
|
||||||
|
file.Close()
|
||||||
|
return nil, storagedriver.InvalidOffsetError{path, offset}
|
||||||
|
}
|
||||||
|
|
||||||
|
return file, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *FilesystemDriver) WriteStream(subPath string, offset, size uint64, reader io.ReadCloser) error {
|
||||||
|
defer reader.Close()
|
||||||
|
|
||||||
|
resumableOffset, err := d.ResumeWritePosition(subPath)
|
||||||
|
if _, pathNotFound := err.(storagedriver.PathNotFoundError); err != nil && !pathNotFound {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if offset > resumableOffset {
|
||||||
|
return storagedriver.InvalidOffsetError{subPath, offset}
|
||||||
|
}
|
||||||
|
|
||||||
|
fullPath := d.subPath(subPath)
|
||||||
|
parentDir := path.Dir(fullPath)
|
||||||
|
err = os.MkdirAll(parentDir, 0755)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var file *os.File
|
||||||
|
if offset == 0 {
|
||||||
|
file, err = os.Create(fullPath)
|
||||||
|
} else {
|
||||||
|
file, err = os.OpenFile(fullPath, os.O_WRONLY|os.O_APPEND, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer file.Close()
|
||||||
|
|
||||||
|
buf := make([]byte, 32*1024)
|
||||||
|
for {
|
||||||
|
bytesRead, er := reader.Read(buf)
|
||||||
|
if bytesRead > 0 {
|
||||||
|
bytesWritten, ew := file.WriteAt(buf[0:bytesRead], int64(offset))
|
||||||
|
if bytesWritten > 0 {
|
||||||
|
offset += uint64(bytesWritten)
|
||||||
|
}
|
||||||
|
if ew != nil {
|
||||||
|
err = ew
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if bytesRead != bytesWritten {
|
||||||
|
err = io.ErrShortWrite
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if er == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if er != nil {
|
||||||
|
err = er
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *FilesystemDriver) ResumeWritePosition(subPath string) (uint64, error) {
|
||||||
|
fullPath := d.subPath(subPath)
|
||||||
|
|
||||||
|
fileInfo, err := os.Stat(fullPath)
|
||||||
|
if err != nil && !os.IsNotExist(err) {
|
||||||
|
return 0, err
|
||||||
|
} else if err != nil {
|
||||||
|
return 0, storagedriver.PathNotFoundError{subPath}
|
||||||
|
}
|
||||||
|
return uint64(fileInfo.Size()), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *FilesystemDriver) List(subPath string) ([]string, error) {
|
||||||
|
subPath = strings.TrimRight(subPath, "/")
|
||||||
|
fullPath := d.subPath(subPath)
|
||||||
|
|
||||||
|
dir, err := os.Open(fullPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
fileNames, err := dir.Readdirnames(0)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
keys := make([]string, 0, len(fileNames))
|
||||||
|
for _, fileName := range fileNames {
|
||||||
|
keys = append(keys, path.Join(subPath, fileName))
|
||||||
|
}
|
||||||
|
|
||||||
|
return keys, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *FilesystemDriver) Move(sourcePath string, destPath string) error {
|
||||||
|
err := os.Rename(d.subPath(sourcePath), d.subPath(destPath))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *FilesystemDriver) Delete(subPath string) error {
|
||||||
|
fullPath := d.subPath(subPath)
|
||||||
|
|
||||||
|
_, err := os.Stat(fullPath)
|
||||||
|
if err != nil && !os.IsNotExist(err) {
|
||||||
|
return err
|
||||||
|
} else if err != nil {
|
||||||
|
return storagedriver.PathNotFoundError{subPath}
|
||||||
|
}
|
||||||
|
|
||||||
|
err = os.RemoveAll(fullPath)
|
||||||
|
return err
|
||||||
|
}
|
24
storagedriver/filesystem/filesystem_test.go
Normal file
24
storagedriver/filesystem/filesystem_test.go
Normal file
|
@ -0,0 +1,24 @@
|
||||||
|
package filesystem
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/docker/docker-registry/storagedriver"
|
||||||
|
"github.com/docker/docker-registry/storagedriver/testsuites"
|
||||||
|
. "gopkg.in/check.v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Hook up gocheck into the "go test" runner.
|
||||||
|
func Test(t *testing.T) { TestingT(t) }
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
rootDirectory := "/tmp/driver"
|
||||||
|
os.RemoveAll(rootDirectory)
|
||||||
|
|
||||||
|
filesystemDriverConstructor := func() (storagedriver.StorageDriver, error) {
|
||||||
|
return New(rootDirectory), nil
|
||||||
|
}
|
||||||
|
testsuites.RegisterInProcessSuite(filesystemDriverConstructor, testsuites.NeverSkip)
|
||||||
|
testsuites.RegisterIPCSuite(DriverName, map[string]string{"rootdirectory": rootDirectory}, testsuites.NeverSkip)
|
||||||
|
}
|
166
storagedriver/inmemory/inmemory.go
Normal file
166
storagedriver/inmemory/inmemory.go
Normal file
|
@ -0,0 +1,166 @@
|
||||||
|
package inmemory
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"regexp"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/docker/docker-registry/storagedriver"
|
||||||
|
"github.com/docker/docker-registry/storagedriver/factory"
|
||||||
|
)
|
||||||
|
|
||||||
|
const DriverName = "inmemory"
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
factory.Register(DriverName, &inMemoryDriverFactory{})
|
||||||
|
}
|
||||||
|
|
||||||
|
// inMemoryDriverFacotry implements the factory.StorageDriverFactory interface
|
||||||
|
type inMemoryDriverFactory struct{}
|
||||||
|
|
||||||
|
func (factory *inMemoryDriverFactory) Create(parameters map[string]string) (storagedriver.StorageDriver, error) {
|
||||||
|
return New(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// InMemoryDriver is a storagedriver.StorageDriver implementation backed by a local map
|
||||||
|
// Intended solely for example and testing purposes
|
||||||
|
type InMemoryDriver struct {
|
||||||
|
storage map[string][]byte
|
||||||
|
mutex sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// New constructs a new InMemoryDriver
|
||||||
|
func New() *InMemoryDriver {
|
||||||
|
return &InMemoryDriver{storage: make(map[string][]byte)}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Implement the storagedriver.StorageDriver interface
|
||||||
|
|
||||||
|
func (d *InMemoryDriver) GetContent(path string) ([]byte, error) {
|
||||||
|
d.mutex.RLock()
|
||||||
|
defer d.mutex.RUnlock()
|
||||||
|
contents, ok := d.storage[path]
|
||||||
|
if !ok {
|
||||||
|
return nil, storagedriver.PathNotFoundError{path}
|
||||||
|
}
|
||||||
|
return contents, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *InMemoryDriver) PutContent(path string, contents []byte) error {
|
||||||
|
d.mutex.Lock()
|
||||||
|
defer d.mutex.Unlock()
|
||||||
|
d.storage[path] = contents
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *InMemoryDriver) ReadStream(path string, offset uint64) (io.ReadCloser, error) {
|
||||||
|
d.mutex.RLock()
|
||||||
|
defer d.mutex.RUnlock()
|
||||||
|
contents, err := d.GetContent(path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else if len(contents) < int(offset) {
|
||||||
|
return nil, storagedriver.InvalidOffsetError{path, offset}
|
||||||
|
}
|
||||||
|
|
||||||
|
src := contents[offset:]
|
||||||
|
buf := make([]byte, len(src))
|
||||||
|
copy(buf, src)
|
||||||
|
return ioutil.NopCloser(bytes.NewReader(buf)), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *InMemoryDriver) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error {
|
||||||
|
defer reader.Close()
|
||||||
|
d.mutex.RLock()
|
||||||
|
defer d.mutex.RUnlock()
|
||||||
|
|
||||||
|
resumableOffset, err := d.ResumeWritePosition(path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if offset > resumableOffset {
|
||||||
|
return storagedriver.InvalidOffsetError{path, offset}
|
||||||
|
}
|
||||||
|
|
||||||
|
contents, err := ioutil.ReadAll(reader)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if offset > 0 {
|
||||||
|
contents = append(d.storage[path][0:offset], contents...)
|
||||||
|
}
|
||||||
|
|
||||||
|
d.storage[path] = contents
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *InMemoryDriver) ResumeWritePosition(path string) (uint64, error) {
|
||||||
|
d.mutex.RLock()
|
||||||
|
defer d.mutex.RUnlock()
|
||||||
|
contents, ok := d.storage[path]
|
||||||
|
if !ok {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
return uint64(len(contents)), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *InMemoryDriver) List(path string) ([]string, error) {
|
||||||
|
subPathMatcher, err := regexp.Compile(fmt.Sprintf("^%s/[^/]+", path))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
d.mutex.RLock()
|
||||||
|
defer d.mutex.RUnlock()
|
||||||
|
// we use map to collect uniq keys
|
||||||
|
keySet := make(map[string]struct{})
|
||||||
|
for k := range d.storage {
|
||||||
|
if key := subPathMatcher.FindString(k); key != "" {
|
||||||
|
keySet[key] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
keys := make([]string, 0, len(keySet))
|
||||||
|
for k := range keySet {
|
||||||
|
keys = append(keys, k)
|
||||||
|
}
|
||||||
|
return keys, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *InMemoryDriver) Move(sourcePath string, destPath string) error {
|
||||||
|
d.mutex.Lock()
|
||||||
|
defer d.mutex.Unlock()
|
||||||
|
contents, ok := d.storage[sourcePath]
|
||||||
|
if !ok {
|
||||||
|
return storagedriver.PathNotFoundError{sourcePath}
|
||||||
|
}
|
||||||
|
d.storage[destPath] = contents
|
||||||
|
delete(d.storage, sourcePath)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *InMemoryDriver) Delete(path string) error {
|
||||||
|
d.mutex.Lock()
|
||||||
|
defer d.mutex.Unlock()
|
||||||
|
subPaths := make([]string, 0)
|
||||||
|
for k := range d.storage {
|
||||||
|
if strings.HasPrefix(k, path) {
|
||||||
|
subPaths = append(subPaths, k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(subPaths) == 0 {
|
||||||
|
return storagedriver.PathNotFoundError{path}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, subPath := range subPaths {
|
||||||
|
delete(d.storage, subPath)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
20
storagedriver/inmemory/inmemory_test.go
Normal file
20
storagedriver/inmemory/inmemory_test.go
Normal file
|
@ -0,0 +1,20 @@
|
||||||
|
package inmemory
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/docker/docker-registry/storagedriver"
|
||||||
|
"github.com/docker/docker-registry/storagedriver/testsuites"
|
||||||
|
. "gopkg.in/check.v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Hook up gocheck into the "go test" runner.
|
||||||
|
func Test(t *testing.T) { TestingT(t) }
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
inmemoryDriverConstructor := func() (storagedriver.StorageDriver, error) {
|
||||||
|
return New(), nil
|
||||||
|
}
|
||||||
|
testsuites.RegisterInProcessSuite(inmemoryDriverConstructor, testsuites.NeverSkip)
|
||||||
|
testsuites.RegisterIPCSuite(DriverName, nil, testsuites.NeverSkip)
|
||||||
|
}
|
307
storagedriver/ipc/client.go
Normal file
307
storagedriver/ipc/client.go
Normal file
|
@ -0,0 +1,307 @@
|
||||||
|
package ipc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"net"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"path"
|
||||||
|
"syscall"
|
||||||
|
|
||||||
|
"github.com/docker/libchan"
|
||||||
|
"github.com/docker/libchan/spdy"
|
||||||
|
)
|
||||||
|
|
||||||
|
// StorageDriverClient is a storagedriver.StorageDriver implementation using a managed child process
|
||||||
|
// communicating over IPC using libchan with a unix domain socket
|
||||||
|
type StorageDriverClient struct {
|
||||||
|
subprocess *exec.Cmd
|
||||||
|
socket *os.File
|
||||||
|
transport *spdy.Transport
|
||||||
|
sender libchan.Sender
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewDriverClient constructs a new out-of-process storage driver using the driver name and
|
||||||
|
// configuration parameters
|
||||||
|
// A user must call Start on this driver client before remote method calls can be made
|
||||||
|
//
|
||||||
|
// Looks for drivers in the following locations in order:
|
||||||
|
// - Storage drivers directory (to be determined, yet not implemented)
|
||||||
|
// - $GOPATH/bin
|
||||||
|
// - $PATH
|
||||||
|
func NewDriverClient(name string, parameters map[string]string) (*StorageDriverClient, error) {
|
||||||
|
paramsBytes, err := json.Marshal(parameters)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
driverPath := os.ExpandEnv(path.Join("$GOPATH", "bin", name))
|
||||||
|
if _, err := os.Stat(driverPath); os.IsNotExist(err) {
|
||||||
|
driverPath = path.Join(path.Dir(os.Args[0]), name)
|
||||||
|
}
|
||||||
|
if _, err := os.Stat(driverPath); os.IsNotExist(err) {
|
||||||
|
driverPath, err = exec.LookPath(name)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
command := exec.Command(driverPath, string(paramsBytes))
|
||||||
|
|
||||||
|
return &StorageDriverClient{
|
||||||
|
subprocess: command,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start starts the designated child process storage driver and binds a socket to this process for
|
||||||
|
// IPC method calls
|
||||||
|
func (driver *StorageDriverClient) Start() error {
|
||||||
|
fileDescriptors, err := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM, 0)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
childSocket := os.NewFile(uintptr(fileDescriptors[0]), "childSocket")
|
||||||
|
parentSocket := os.NewFile(uintptr(fileDescriptors[1]), "parentSocket")
|
||||||
|
|
||||||
|
driver.subprocess.Stdout = os.Stdout
|
||||||
|
driver.subprocess.Stderr = os.Stderr
|
||||||
|
driver.subprocess.ExtraFiles = []*os.File{childSocket}
|
||||||
|
|
||||||
|
if err = driver.subprocess.Start(); err != nil {
|
||||||
|
parentSocket.Close()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = childSocket.Close(); err != nil {
|
||||||
|
parentSocket.Close()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
connection, err := net.FileConn(parentSocket)
|
||||||
|
if err != nil {
|
||||||
|
parentSocket.Close()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
transport, err := spdy.NewClientTransport(connection)
|
||||||
|
if err != nil {
|
||||||
|
parentSocket.Close()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
sender, err := transport.NewSendChannel()
|
||||||
|
if err != nil {
|
||||||
|
transport.Close()
|
||||||
|
parentSocket.Close()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
driver.socket = parentSocket
|
||||||
|
driver.transport = transport
|
||||||
|
driver.sender = sender
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop stops the child process storage driver
|
||||||
|
// storagedriver.StorageDriver methods called after Stop will fail
|
||||||
|
func (driver *StorageDriverClient) Stop() error {
|
||||||
|
closeSenderErr := driver.sender.Close()
|
||||||
|
closeTransportErr := driver.transport.Close()
|
||||||
|
closeSocketErr := driver.socket.Close()
|
||||||
|
killErr := driver.subprocess.Process.Kill()
|
||||||
|
|
||||||
|
if closeSenderErr != nil {
|
||||||
|
return closeSenderErr
|
||||||
|
} else if closeTransportErr != nil {
|
||||||
|
return closeTransportErr
|
||||||
|
} else if closeSocketErr != nil {
|
||||||
|
return closeSocketErr
|
||||||
|
}
|
||||||
|
return killErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// Implement the storagedriver.StorageDriver interface over IPC
|
||||||
|
|
||||||
|
func (driver *StorageDriverClient) GetContent(path string) ([]byte, error) {
|
||||||
|
receiver, remoteSender := libchan.Pipe()
|
||||||
|
|
||||||
|
params := map[string]interface{}{"Path": path}
|
||||||
|
err := driver.sender.Send(&Request{Type: "GetContent", Parameters: params, ResponseChannel: remoteSender})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var response ReadStreamResponse
|
||||||
|
err = receiver.Receive(&response)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if response.Error != nil {
|
||||||
|
return nil, response.Error
|
||||||
|
}
|
||||||
|
|
||||||
|
defer response.Reader.Close()
|
||||||
|
contents, err := ioutil.ReadAll(response.Reader)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return contents, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (driver *StorageDriverClient) PutContent(path string, contents []byte) error {
|
||||||
|
receiver, remoteSender := libchan.Pipe()
|
||||||
|
|
||||||
|
params := map[string]interface{}{"Path": path, "Reader": ioutil.NopCloser(bytes.NewReader(contents))}
|
||||||
|
err := driver.sender.Send(&Request{Type: "PutContent", Parameters: params, ResponseChannel: remoteSender})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var response WriteStreamResponse
|
||||||
|
err = receiver.Receive(&response)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if response.Error != nil {
|
||||||
|
return response.Error
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (driver *StorageDriverClient) ReadStream(path string, offset uint64) (io.ReadCloser, error) {
|
||||||
|
receiver, remoteSender := libchan.Pipe()
|
||||||
|
|
||||||
|
params := map[string]interface{}{"Path": path, "Offset": offset}
|
||||||
|
err := driver.sender.Send(&Request{Type: "ReadStream", Parameters: params, ResponseChannel: remoteSender})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var response ReadStreamResponse
|
||||||
|
err = receiver.Receive(&response)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if response.Error != nil {
|
||||||
|
return nil, response.Error
|
||||||
|
}
|
||||||
|
|
||||||
|
return response.Reader, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (driver *StorageDriverClient) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error {
|
||||||
|
receiver, remoteSender := libchan.Pipe()
|
||||||
|
|
||||||
|
params := map[string]interface{}{"Path": path, "Offset": offset, "Size": size, "Reader": ioutil.NopCloser(reader)}
|
||||||
|
err := driver.sender.Send(&Request{Type: "WriteStream", Parameters: params, ResponseChannel: remoteSender})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var response WriteStreamResponse
|
||||||
|
err = receiver.Receive(&response)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if response.Error != nil {
|
||||||
|
return response.Error
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (driver *StorageDriverClient) ResumeWritePosition(path string) (uint64, error) {
|
||||||
|
receiver, remoteSender := libchan.Pipe()
|
||||||
|
|
||||||
|
params := map[string]interface{}{"Path": path}
|
||||||
|
err := driver.sender.Send(&Request{Type: "ResumeWritePosition", Parameters: params, ResponseChannel: remoteSender})
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var response ResumeWritePositionResponse
|
||||||
|
err = receiver.Receive(&response)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if response.Error != nil {
|
||||||
|
return 0, response.Error
|
||||||
|
}
|
||||||
|
|
||||||
|
return response.Position, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (driver *StorageDriverClient) List(path string) ([]string, error) {
|
||||||
|
receiver, remoteSender := libchan.Pipe()
|
||||||
|
|
||||||
|
params := map[string]interface{}{"Path": path}
|
||||||
|
err := driver.sender.Send(&Request{Type: "List", Parameters: params, ResponseChannel: remoteSender})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var response ListResponse
|
||||||
|
err = receiver.Receive(&response)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if response.Error != nil {
|
||||||
|
return nil, response.Error
|
||||||
|
}
|
||||||
|
|
||||||
|
return response.Keys, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (driver *StorageDriverClient) Move(sourcePath string, destPath string) error {
|
||||||
|
receiver, remoteSender := libchan.Pipe()
|
||||||
|
|
||||||
|
params := map[string]interface{}{"SourcePath": sourcePath, "DestPath": destPath}
|
||||||
|
err := driver.sender.Send(&Request{Type: "Move", Parameters: params, ResponseChannel: remoteSender})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var response MoveResponse
|
||||||
|
err = receiver.Receive(&response)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if response.Error != nil {
|
||||||
|
return response.Error
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (driver *StorageDriverClient) Delete(path string) error {
|
||||||
|
receiver, remoteSender := libchan.Pipe()
|
||||||
|
|
||||||
|
params := map[string]interface{}{"Path": path}
|
||||||
|
err := driver.sender.Send(&Request{Type: "Delete", Parameters: params, ResponseChannel: remoteSender})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var response DeleteResponse
|
||||||
|
err = receiver.Receive(&response)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if response.Error != nil {
|
||||||
|
return response.Error
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
72
storagedriver/ipc/ipc.go
Normal file
72
storagedriver/ipc/ipc.go
Normal file
|
@ -0,0 +1,72 @@
|
||||||
|
package ipc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"reflect"
|
||||||
|
|
||||||
|
"github.com/docker/libchan"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Request defines a remote method call request
|
||||||
|
// A return value struct is to be sent over the ResponseChannel
|
||||||
|
type Request struct {
|
||||||
|
Type string
|
||||||
|
Parameters map[string]interface{}
|
||||||
|
ResponseChannel libchan.Sender
|
||||||
|
}
|
||||||
|
|
||||||
|
type responseError struct {
|
||||||
|
Type string
|
||||||
|
Message string
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResponseError wraps an error in a serializable struct containing the error's type and message
|
||||||
|
func ResponseError(err error) *responseError {
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return &responseError{
|
||||||
|
Type: reflect.TypeOf(err).String(),
|
||||||
|
Message: err.Error(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (err *responseError) Error() string {
|
||||||
|
return fmt.Sprintf("%s: %s", err.Type, err.Message)
|
||||||
|
}
|
||||||
|
|
||||||
|
// IPC method call response object definitions
|
||||||
|
|
||||||
|
// ReadStreamResponse is a response for a ReadStream request
|
||||||
|
type ReadStreamResponse struct {
|
||||||
|
Reader io.ReadCloser
|
||||||
|
Error *responseError
|
||||||
|
}
|
||||||
|
|
||||||
|
// WriteStreamResponse is a response for a WriteStream request
|
||||||
|
type WriteStreamResponse struct {
|
||||||
|
Error *responseError
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResumeWritePositionResponse is a response for a ResumeWritePosition request
|
||||||
|
type ResumeWritePositionResponse struct {
|
||||||
|
Position uint64
|
||||||
|
Error *responseError
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListResponse is a response for a List request
|
||||||
|
type ListResponse struct {
|
||||||
|
Keys []string
|
||||||
|
Error *responseError
|
||||||
|
}
|
||||||
|
|
||||||
|
// MoveResponse is a response for a Move request
|
||||||
|
type MoveResponse struct {
|
||||||
|
Error *responseError
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteResponse is a response for a Delete request
|
||||||
|
type DeleteResponse struct {
|
||||||
|
Error *responseError
|
||||||
|
}
|
168
storagedriver/ipc/server.go
Normal file
168
storagedriver/ipc/server.go
Normal file
|
@ -0,0 +1,168 @@
|
||||||
|
package ipc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"net"
|
||||||
|
"os"
|
||||||
|
"reflect"
|
||||||
|
|
||||||
|
"github.com/docker/docker-registry/storagedriver"
|
||||||
|
"github.com/docker/libchan"
|
||||||
|
"github.com/docker/libchan/spdy"
|
||||||
|
)
|
||||||
|
|
||||||
|
// StorageDriverServer runs a new IPC server handling requests for the given
|
||||||
|
// storagedriver.StorageDriver
|
||||||
|
// This explicitly uses file descriptor 3 for IPC communication, as storage drivers are spawned in
|
||||||
|
// client.go
|
||||||
|
//
|
||||||
|
// To create a new out-of-process driver, create a main package which calls StorageDriverServer with
|
||||||
|
// a storagedriver.StorageDriver
|
||||||
|
func StorageDriverServer(driver storagedriver.StorageDriver) error {
|
||||||
|
childSocket := os.NewFile(3, "childSocket")
|
||||||
|
defer childSocket.Close()
|
||||||
|
conn, err := net.FileConn(childSocket)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
defer conn.Close()
|
||||||
|
if transport, err := spdy.NewServerTransport(conn); err != nil {
|
||||||
|
panic(err)
|
||||||
|
} else {
|
||||||
|
for {
|
||||||
|
receiver, err := transport.WaitReceiveChannel()
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
go receive(driver, receiver)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// receive receives new storagedriver.StorageDriver method requests and creates a new goroutine to
|
||||||
|
// handle each request
|
||||||
|
// Requests are expected to be of type ipc.Request as the parameters are unknown until the request
|
||||||
|
// type is deserialized
|
||||||
|
func receive(driver storagedriver.StorageDriver, receiver libchan.Receiver) {
|
||||||
|
for {
|
||||||
|
var request Request
|
||||||
|
err := receiver.Receive(&request)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
go handleRequest(driver, request)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleRequest handles storagedriver.StorageDriver method requests as defined in client.go
|
||||||
|
// Responds to requests using the Request.ResponseChannel
|
||||||
|
func handleRequest(driver storagedriver.StorageDriver, request Request) {
|
||||||
|
switch request.Type {
|
||||||
|
case "GetContent":
|
||||||
|
path, _ := request.Parameters["Path"].(string)
|
||||||
|
content, err := driver.GetContent(path)
|
||||||
|
var response ReadStreamResponse
|
||||||
|
if err != nil {
|
||||||
|
response = ReadStreamResponse{Error: ResponseError(err)}
|
||||||
|
} else {
|
||||||
|
response = ReadStreamResponse{Reader: ioutil.NopCloser(bytes.NewReader(content))}
|
||||||
|
}
|
||||||
|
err = request.ResponseChannel.Send(&response)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
case "PutContent":
|
||||||
|
path, _ := request.Parameters["Path"].(string)
|
||||||
|
reader, _ := request.Parameters["Reader"].(io.ReadCloser)
|
||||||
|
contents, err := ioutil.ReadAll(reader)
|
||||||
|
defer reader.Close()
|
||||||
|
if err == nil {
|
||||||
|
err = driver.PutContent(path, contents)
|
||||||
|
}
|
||||||
|
response := WriteStreamResponse{
|
||||||
|
Error: ResponseError(err),
|
||||||
|
}
|
||||||
|
err = request.ResponseChannel.Send(&response)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
case "ReadStream":
|
||||||
|
path, _ := request.Parameters["Path"].(string)
|
||||||
|
// Depending on serialization method, Offset may be convereted to any int/uint type
|
||||||
|
offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(uint64(0))).Uint()
|
||||||
|
reader, err := driver.ReadStream(path, offset)
|
||||||
|
var response ReadStreamResponse
|
||||||
|
if err != nil {
|
||||||
|
response = ReadStreamResponse{Error: ResponseError(err)}
|
||||||
|
} else {
|
||||||
|
response = ReadStreamResponse{Reader: ioutil.NopCloser(reader)}
|
||||||
|
}
|
||||||
|
err = request.ResponseChannel.Send(&response)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
case "WriteStream":
|
||||||
|
path, _ := request.Parameters["Path"].(string)
|
||||||
|
// Depending on serialization method, Offset may be convereted to any int/uint type
|
||||||
|
offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(uint64(0))).Uint()
|
||||||
|
// Depending on serialization method, Size may be convereted to any int/uint type
|
||||||
|
size := reflect.ValueOf(request.Parameters["Size"]).Convert(reflect.TypeOf(uint64(0))).Uint()
|
||||||
|
reader, _ := request.Parameters["Reader"].(io.ReadCloser)
|
||||||
|
err := driver.WriteStream(path, offset, size, reader)
|
||||||
|
response := WriteStreamResponse{
|
||||||
|
Error: ResponseError(err),
|
||||||
|
}
|
||||||
|
err = request.ResponseChannel.Send(&response)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
case "ResumeWritePosition":
|
||||||
|
path, _ := request.Parameters["Path"].(string)
|
||||||
|
position, err := driver.ResumeWritePosition(path)
|
||||||
|
response := ResumeWritePositionResponse{
|
||||||
|
Position: position,
|
||||||
|
Error: ResponseError(err),
|
||||||
|
}
|
||||||
|
err = request.ResponseChannel.Send(&response)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
case "List":
|
||||||
|
path, _ := request.Parameters["Path"].(string)
|
||||||
|
keys, err := driver.List(path)
|
||||||
|
response := ListResponse{
|
||||||
|
Keys: keys,
|
||||||
|
Error: ResponseError(err),
|
||||||
|
}
|
||||||
|
err = request.ResponseChannel.Send(&response)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
case "Move":
|
||||||
|
sourcePath, _ := request.Parameters["SourcePath"].(string)
|
||||||
|
destPath, _ := request.Parameters["DestPath"].(string)
|
||||||
|
err := driver.Move(sourcePath, destPath)
|
||||||
|
response := MoveResponse{
|
||||||
|
Error: ResponseError(err),
|
||||||
|
}
|
||||||
|
err = request.ResponseChannel.Send(&response)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
case "Delete":
|
||||||
|
path, _ := request.Parameters["Path"].(string)
|
||||||
|
err := driver.Delete(path)
|
||||||
|
response := DeleteResponse{
|
||||||
|
Error: ResponseError(err),
|
||||||
|
}
|
||||||
|
err = request.ResponseChannel.Send(&response)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
panic(request)
|
||||||
|
}
|
||||||
|
}
|
311
storagedriver/s3/s3.go
Normal file
311
storagedriver/s3/s3.go
Normal file
|
@ -0,0 +1,311 @@
|
||||||
|
package s3
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"github.com/crowdmob/goamz/aws"
|
||||||
|
"github.com/crowdmob/goamz/s3"
|
||||||
|
"github.com/docker/docker-registry/storagedriver"
|
||||||
|
"github.com/docker/docker-registry/storagedriver/factory"
|
||||||
|
)
|
||||||
|
|
||||||
|
const DriverName = "s3"
|
||||||
|
|
||||||
|
// minChunkSize defines the minimum multipart upload chunk size
|
||||||
|
// S3 API requires multipart upload chunks to be at least 5MB
|
||||||
|
const minChunkSize = uint64(5 * 1024 * 1024)
|
||||||
|
|
||||||
|
// listPartsMax is the largest amount of parts you can request from S3
|
||||||
|
const listPartsMax = 1000
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
factory.Register(DriverName, &s3DriverFactory{})
|
||||||
|
}
|
||||||
|
|
||||||
|
// s3DriverFactory implements the factory.StorageDriverFactory interface
|
||||||
|
type s3DriverFactory struct{}
|
||||||
|
|
||||||
|
func (factory *s3DriverFactory) Create(parameters map[string]string) (storagedriver.StorageDriver, error) {
|
||||||
|
return FromParameters(parameters)
|
||||||
|
}
|
||||||
|
|
||||||
|
// S3Driver is a storagedriver.StorageDriver implementation backed by Amazon S3
|
||||||
|
// Objects are stored at absolute keys in the provided bucket
|
||||||
|
type S3Driver struct {
|
||||||
|
S3 *s3.S3
|
||||||
|
Bucket *s3.Bucket
|
||||||
|
Encrypt bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// FromParameters constructs a new S3Driver with a given parameters map
|
||||||
|
// Required parameters:
|
||||||
|
// - accesskey
|
||||||
|
// - secretkey
|
||||||
|
// - region
|
||||||
|
// - bucket
|
||||||
|
// - encrypt
|
||||||
|
func FromParameters(parameters map[string]string) (*S3Driver, error) {
|
||||||
|
accessKey, ok := parameters["accesskey"]
|
||||||
|
if !ok || accessKey == "" {
|
||||||
|
return nil, fmt.Errorf("No accesskey parameter provided")
|
||||||
|
}
|
||||||
|
|
||||||
|
secretKey, ok := parameters["secretkey"]
|
||||||
|
if !ok || secretKey == "" {
|
||||||
|
return nil, fmt.Errorf("No secretkey parameter provided")
|
||||||
|
}
|
||||||
|
|
||||||
|
regionName, ok := parameters["region"]
|
||||||
|
if !ok || regionName == "" {
|
||||||
|
return nil, fmt.Errorf("No region parameter provided")
|
||||||
|
}
|
||||||
|
region := aws.GetRegion(regionName)
|
||||||
|
if region.Name == "" {
|
||||||
|
return nil, fmt.Errorf("Invalid region provided: %s", region)
|
||||||
|
}
|
||||||
|
|
||||||
|
bucket, ok := parameters["bucket"]
|
||||||
|
if !ok || bucket == "" {
|
||||||
|
return nil, fmt.Errorf("No bucket parameter provided")
|
||||||
|
}
|
||||||
|
|
||||||
|
encrypt, ok := parameters["encrypt"]
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("No encrypt parameter provided")
|
||||||
|
}
|
||||||
|
|
||||||
|
encryptBool, err := strconv.ParseBool(encrypt)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Unable to parse the encrypt parameter: %v", err)
|
||||||
|
}
|
||||||
|
return New(accessKey, secretKey, region, encryptBool, bucket)
|
||||||
|
}
|
||||||
|
|
||||||
|
// New constructs a new S3Driver with the given AWS credentials, region, encryption flag, and
|
||||||
|
// bucketName
|
||||||
|
func New(accessKey string, secretKey string, region aws.Region, encrypt bool, bucketName string) (*S3Driver, error) {
|
||||||
|
auth := aws.Auth{AccessKey: accessKey, SecretKey: secretKey}
|
||||||
|
s3obj := s3.New(auth, region)
|
||||||
|
bucket := s3obj.Bucket(bucketName)
|
||||||
|
|
||||||
|
if err := bucket.PutBucket(getPermissions()); err != nil {
|
||||||
|
s3Err, ok := err.(*s3.Error)
|
||||||
|
if !(ok && s3Err.Code == "BucketAlreadyOwnedByYou") {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &S3Driver{s3obj, bucket, encrypt}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Implement the storagedriver.StorageDriver interface
|
||||||
|
|
||||||
|
func (d *S3Driver) GetContent(path string) ([]byte, error) {
|
||||||
|
return d.Bucket.Get(path)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *S3Driver) PutContent(path string, contents []byte) error {
|
||||||
|
return d.Bucket.Put(path, contents, d.getContentType(), getPermissions(), d.getOptions())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *S3Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) {
|
||||||
|
headers := make(http.Header)
|
||||||
|
headers.Add("Range", "bytes="+strconv.FormatUint(offset, 10)+"-")
|
||||||
|
|
||||||
|
resp, err := d.Bucket.GetResponseWithHeaders(path, headers)
|
||||||
|
if resp != nil {
|
||||||
|
return resp.Body, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *S3Driver) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error {
|
||||||
|
defer reader.Close()
|
||||||
|
|
||||||
|
chunkSize := minChunkSize
|
||||||
|
for size/chunkSize >= listPartsMax {
|
||||||
|
chunkSize *= 2
|
||||||
|
}
|
||||||
|
|
||||||
|
partNumber := 1
|
||||||
|
totalRead := uint64(0)
|
||||||
|
multi, parts, err := d.getAllParts(path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if (offset) > uint64(len(parts))*chunkSize || (offset < size && offset%chunkSize != 0) {
|
||||||
|
return storagedriver.InvalidOffsetError{path, offset}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(parts) > 0 {
|
||||||
|
partNumber = int(offset/chunkSize) + 1
|
||||||
|
totalRead = offset
|
||||||
|
parts = parts[0 : partNumber-1]
|
||||||
|
}
|
||||||
|
|
||||||
|
buf := make([]byte, chunkSize)
|
||||||
|
for {
|
||||||
|
bytesRead, err := io.ReadFull(reader, buf)
|
||||||
|
totalRead += uint64(bytesRead)
|
||||||
|
|
||||||
|
if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
|
||||||
|
return err
|
||||||
|
} else if (uint64(bytesRead) < chunkSize) && totalRead != size {
|
||||||
|
break
|
||||||
|
} else {
|
||||||
|
part, err := multi.PutPart(int(partNumber), bytes.NewReader(buf[0:bytesRead]))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
parts = append(parts, part)
|
||||||
|
if totalRead == size {
|
||||||
|
multi.Complete(parts)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
partNumber++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *S3Driver) ResumeWritePosition(path string) (uint64, error) {
|
||||||
|
_, parts, err := d.getAllParts(path)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(parts) == 0 {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return (((uint64(len(parts)) - 1) * uint64(parts[0].Size)) + uint64(parts[len(parts)-1].Size)), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *S3Driver) List(prefix string) ([]string, error) {
|
||||||
|
if prefix[len(prefix)-1] != '/' {
|
||||||
|
prefix = prefix + "/"
|
||||||
|
}
|
||||||
|
listResponse, err := d.Bucket.List(prefix, "/", "", listPartsMax)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
files := []string{}
|
||||||
|
directories := []string{}
|
||||||
|
|
||||||
|
for {
|
||||||
|
for _, key := range listResponse.Contents {
|
||||||
|
files = append(files, key.Key)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, commonPrefix := range listResponse.CommonPrefixes {
|
||||||
|
directories = append(directories, commonPrefix[0:len(commonPrefix)-1])
|
||||||
|
}
|
||||||
|
|
||||||
|
if listResponse.IsTruncated {
|
||||||
|
listResponse, err = d.Bucket.List(prefix, "/", listResponse.NextMarker, listPartsMax)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return append(files, directories...), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *S3Driver) Move(sourcePath string, destPath string) error {
|
||||||
|
/* This is terrible, but aws doesn't have an actual move. */
|
||||||
|
_, err := d.Bucket.PutCopy(destPath, getPermissions(), s3.CopyOptions{d.getOptions(), "", d.getContentType()}, d.Bucket.Name+"/"+sourcePath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return d.Delete(sourcePath)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *S3Driver) Delete(path string) error {
|
||||||
|
listResponse, err := d.Bucket.List(path, "", "", listPartsMax)
|
||||||
|
if err != nil || len(listResponse.Contents) == 0 {
|
||||||
|
return storagedriver.PathNotFoundError{path}
|
||||||
|
}
|
||||||
|
|
||||||
|
s3Objects := make([]s3.Object, listPartsMax)
|
||||||
|
|
||||||
|
for len(listResponse.Contents) > 0 {
|
||||||
|
for index, key := range listResponse.Contents {
|
||||||
|
s3Objects[index].Key = key.Key
|
||||||
|
}
|
||||||
|
|
||||||
|
err := d.Bucket.DelMulti(s3.Delete{false, s3Objects[0:len(listResponse.Contents)]})
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
listResponse, err = d.Bucket.List(path, "", "", listPartsMax)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *S3Driver) getHighestIdMulti(path string) (multi *s3.Multi, err error) {
|
||||||
|
multis, _, err := d.Bucket.ListMulti(path, "")
|
||||||
|
if err != nil && !hasCode(err, "NoSuchUpload") {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
uploadId := ""
|
||||||
|
|
||||||
|
if len(multis) > 0 {
|
||||||
|
for _, m := range multis {
|
||||||
|
if m.Key == path && m.UploadId >= uploadId {
|
||||||
|
uploadId = m.UploadId
|
||||||
|
multi = m
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return multi, nil
|
||||||
|
} else {
|
||||||
|
multi, err := d.Bucket.InitMulti(path, d.getContentType(), getPermissions(), d.getOptions())
|
||||||
|
return multi, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *S3Driver) getAllParts(path string) (*s3.Multi, []s3.Part, error) {
|
||||||
|
multi, err := d.getHighestIdMulti(path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
parts, err := multi.ListParts()
|
||||||
|
return multi, parts, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func hasCode(err error, code string) bool {
|
||||||
|
s3err, ok := err.(*aws.Error)
|
||||||
|
return ok && s3err.Code == code
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *S3Driver) getOptions() s3.Options {
|
||||||
|
return s3.Options{SSE: d.Encrypt}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getPermissions() s3.ACL {
|
||||||
|
return s3.Private
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *S3Driver) getContentType() string {
|
||||||
|
return "application/octet-stream"
|
||||||
|
}
|
48
storagedriver/s3/s3_test.go
Normal file
48
storagedriver/s3/s3_test.go
Normal file
|
@ -0,0 +1,48 @@
|
||||||
|
package s3
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/crowdmob/goamz/aws"
|
||||||
|
"github.com/docker/docker-registry/storagedriver"
|
||||||
|
"github.com/docker/docker-registry/storagedriver/testsuites"
|
||||||
|
. "gopkg.in/check.v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Hook up gocheck into the "go test" runner.
|
||||||
|
func Test(t *testing.T) { TestingT(t) }
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
accessKey := os.Getenv("AWS_ACCESS_KEY")
|
||||||
|
secretKey := os.Getenv("AWS_SECRET_KEY")
|
||||||
|
region := os.Getenv("AWS_REGION")
|
||||||
|
bucket := os.Getenv("S3_BUCKET")
|
||||||
|
encrypt := os.Getenv("S3_ENCRYPT")
|
||||||
|
|
||||||
|
s3DriverConstructor := func() (storagedriver.StorageDriver, error) {
|
||||||
|
shouldEncrypt, err := strconv.ParseBool(encrypt)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return New(accessKey, secretKey, aws.GetRegion(region), shouldEncrypt, bucket)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip S3 storage driver tests if environment variable parameters are not provided
|
||||||
|
skipCheck := func() string {
|
||||||
|
if accessKey == "" || secretKey == "" || region == "" || bucket == "" || encrypt == "" {
|
||||||
|
return "Must set AWS_ACCESS_KEY, AWS_SECRET_KEY, AWS_REGION, S3_BUCKET, and S3_ENCRYPT to run S3 tests"
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
testsuites.RegisterInProcessSuite(s3DriverConstructor, skipCheck)
|
||||||
|
testsuites.RegisterIPCSuite(DriverName, map[string]string{
|
||||||
|
"accesskey": accessKey,
|
||||||
|
"secretkey": secretKey,
|
||||||
|
"region": region,
|
||||||
|
"bucket": bucket,
|
||||||
|
"encrypt": encrypt,
|
||||||
|
}, skipCheck)
|
||||||
|
}
|
63
storagedriver/storagedriver.go
Normal file
63
storagedriver/storagedriver.go
Normal file
|
@ -0,0 +1,63 @@
|
||||||
|
package storagedriver
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
)
|
||||||
|
|
||||||
|
// StorageDriver defines methods that a Storage Driver must implement for a filesystem-like
|
||||||
|
// key/value object storage
|
||||||
|
type StorageDriver interface {
|
||||||
|
// GetContent retrieves the content stored at "path" as a []byte
|
||||||
|
// Should primarily be used for small objects
|
||||||
|
GetContent(path string) ([]byte, error)
|
||||||
|
|
||||||
|
// PutContent stores the []byte content at a location designated by "path"
|
||||||
|
// Should primarily be used for small objects
|
||||||
|
PutContent(path string, content []byte) error
|
||||||
|
|
||||||
|
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a given byte
|
||||||
|
// offset
|
||||||
|
// May be used to resume reading a stream by providing a nonzero offset
|
||||||
|
ReadStream(path string, offset uint64) (io.ReadCloser, error)
|
||||||
|
|
||||||
|
// WriteStream stores the contents of the provided io.ReadCloser at a location designated by
|
||||||
|
// the given path
|
||||||
|
// The driver will know it has received the full contents when it has read "size" bytes
|
||||||
|
// May be used to resume writing a stream by providing a nonzero offset
|
||||||
|
// The offset must be no larger than the ResumeWritePosition for this path
|
||||||
|
WriteStream(path string, offset, size uint64, readCloser io.ReadCloser) error
|
||||||
|
|
||||||
|
// ResumeWritePosition retrieves the byte offset at which it is safe to continue writing at the
|
||||||
|
// given path
|
||||||
|
ResumeWritePosition(path string) (uint64, error)
|
||||||
|
|
||||||
|
// List returns a list of the objects that are direct descendants of the given path
|
||||||
|
List(path string) ([]string, error)
|
||||||
|
|
||||||
|
// Move moves an object stored at sourcePath to destPath, removing the original object
|
||||||
|
// Note: This may be no more efficient than a copy followed by a delete for many implementations
|
||||||
|
Move(sourcePath string, destPath string) error
|
||||||
|
|
||||||
|
// Delete recursively deletes all objects stored at "path" and its subpaths
|
||||||
|
Delete(path string) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// PathNotFoundError is returned when operating on a nonexistent path
|
||||||
|
type PathNotFoundError struct {
|
||||||
|
Path string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (err PathNotFoundError) Error() string {
|
||||||
|
return fmt.Sprintf("Path not found: %s", err.Path)
|
||||||
|
}
|
||||||
|
|
||||||
|
// InvalidOffsetError is returned when attempting to read or write from an invalid offset
|
||||||
|
type InvalidOffsetError struct {
|
||||||
|
Path string
|
||||||
|
Offset uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (err InvalidOffsetError) Error() string {
|
||||||
|
return fmt.Sprintf("Invalid offset: %d for path: %s", err.Offset, err.Path)
|
||||||
|
}
|
370
storagedriver/testsuites/testsuites.go
Normal file
370
storagedriver/testsuites/testsuites.go
Normal file
|
@ -0,0 +1,370 @@
|
||||||
|
package testsuites
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"io/ioutil"
|
||||||
|
"math/rand"
|
||||||
|
"path"
|
||||||
|
"sort"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/docker/docker-registry/storagedriver"
|
||||||
|
"github.com/docker/docker-registry/storagedriver/ipc"
|
||||||
|
|
||||||
|
. "gopkg.in/check.v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Hook up gocheck into the "go test" runner
|
||||||
|
func Test(t *testing.T) { TestingT(t) }
|
||||||
|
|
||||||
|
// RegisterInProcessSuite registers an in-process storage driver test suite with the go test runner
|
||||||
|
func RegisterInProcessSuite(driverConstructor DriverConstructor, skipCheck SkipCheck) {
|
||||||
|
Suite(&DriverSuite{
|
||||||
|
Constructor: driverConstructor,
|
||||||
|
SkipCheck: skipCheck,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisterIPCSuite registers a storage driver test suite which runs the named driver as a child
|
||||||
|
// process with the given parameters
|
||||||
|
func RegisterIPCSuite(driverName string, ipcParams map[string]string, skipCheck SkipCheck) {
|
||||||
|
suite := &DriverSuite{
|
||||||
|
Constructor: func() (storagedriver.StorageDriver, error) {
|
||||||
|
d, err := ipc.NewDriverClient(driverName, ipcParams)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
err = d.Start()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return d, nil
|
||||||
|
},
|
||||||
|
SkipCheck: skipCheck,
|
||||||
|
}
|
||||||
|
suite.Teardown = func() error {
|
||||||
|
if suite.StorageDriver == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
driverClient := suite.StorageDriver.(*ipc.StorageDriverClient)
|
||||||
|
return driverClient.Stop()
|
||||||
|
}
|
||||||
|
Suite(suite)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SkipCheck is a function used to determine if a test suite should be skipped
|
||||||
|
// If a SkipCheck returns a non-empty skip reason, the suite is skipped with the given reason
|
||||||
|
type SkipCheck func() (reason string)
|
||||||
|
|
||||||
|
// NeverSkip is a default SkipCheck which never skips the suite
|
||||||
|
var NeverSkip SkipCheck = func() string { return "" }
|
||||||
|
|
||||||
|
// DriverConstructor is a function which returns a new storagedriver.StorageDriver
|
||||||
|
type DriverConstructor func() (storagedriver.StorageDriver, error)
|
||||||
|
|
||||||
|
// DriverTeardown is a function which cleans up a suite's storagedriver.StorageDriver
|
||||||
|
type DriverTeardown func() error
|
||||||
|
|
||||||
|
// DriverSuite is a gocheck test suite designed to test a storagedriver.StorageDriver
|
||||||
|
// The intended way to create a DriverSuite is with RegisterInProcessSuite or RegisterIPCSuite
|
||||||
|
type DriverSuite struct {
|
||||||
|
Constructor DriverConstructor
|
||||||
|
Teardown DriverTeardown
|
||||||
|
SkipCheck
|
||||||
|
storagedriver.StorageDriver
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *DriverSuite) SetUpSuite(c *C) {
|
||||||
|
if reason := suite.SkipCheck(); reason != "" {
|
||||||
|
c.Skip(reason)
|
||||||
|
}
|
||||||
|
d, err := suite.Constructor()
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
suite.StorageDriver = d
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *DriverSuite) TearDownSuite(c *C) {
|
||||||
|
if suite.Teardown != nil {
|
||||||
|
err := suite.Teardown()
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *DriverSuite) TestWriteRead1(c *C) {
|
||||||
|
filename := randomString(32)
|
||||||
|
contents := []byte("a")
|
||||||
|
suite.writeReadCompare(c, filename, contents, contents)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *DriverSuite) TestWriteRead2(c *C) {
|
||||||
|
filename := randomString(32)
|
||||||
|
contents := []byte("\xc3\x9f")
|
||||||
|
suite.writeReadCompare(c, filename, contents, contents)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *DriverSuite) TestWriteRead3(c *C) {
|
||||||
|
filename := randomString(32)
|
||||||
|
contents := []byte(randomString(32))
|
||||||
|
suite.writeReadCompare(c, filename, contents, contents)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *DriverSuite) TestWriteRead4(c *C) {
|
||||||
|
filename := randomString(32)
|
||||||
|
contents := []byte(randomString(1024 * 1024))
|
||||||
|
suite.writeReadCompare(c, filename, contents, contents)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *DriverSuite) TestReadNonexistent(c *C) {
|
||||||
|
filename := randomString(32)
|
||||||
|
_, err := suite.StorageDriver.GetContent(filename)
|
||||||
|
c.Assert(err, NotNil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *DriverSuite) TestWriteReadStreams1(c *C) {
|
||||||
|
filename := randomString(32)
|
||||||
|
contents := []byte("a")
|
||||||
|
suite.writeReadCompareStreams(c, filename, contents, contents)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *DriverSuite) TestWriteReadStreams2(c *C) {
|
||||||
|
filename := randomString(32)
|
||||||
|
contents := []byte("\xc3\x9f")
|
||||||
|
suite.writeReadCompareStreams(c, filename, contents, contents)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *DriverSuite) TestWriteReadStreams3(c *C) {
|
||||||
|
filename := randomString(32)
|
||||||
|
contents := []byte(randomString(32))
|
||||||
|
suite.writeReadCompareStreams(c, filename, contents, contents)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *DriverSuite) TestWriteReadStreams4(c *C) {
|
||||||
|
filename := randomString(32)
|
||||||
|
contents := []byte(randomString(1024 * 1024))
|
||||||
|
suite.writeReadCompareStreams(c, filename, contents, contents)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *DriverSuite) TestContinueStreamAppend(c *C) {
|
||||||
|
filename := randomString(32)
|
||||||
|
defer suite.StorageDriver.Delete(filename)
|
||||||
|
|
||||||
|
chunkSize := uint64(10 * 1024 * 1024)
|
||||||
|
|
||||||
|
contentsChunk1 := []byte(randomString(chunkSize))
|
||||||
|
contentsChunk2 := []byte(randomString(chunkSize))
|
||||||
|
contentsChunk3 := []byte(randomString(chunkSize))
|
||||||
|
|
||||||
|
fullContents := append(append(contentsChunk1, contentsChunk2...), contentsChunk3...)
|
||||||
|
|
||||||
|
err := suite.StorageDriver.WriteStream(filename, 0, 3*chunkSize, ioutil.NopCloser(bytes.NewReader(contentsChunk1)))
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
offset, err := suite.StorageDriver.ResumeWritePosition(filename)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
if offset > chunkSize {
|
||||||
|
c.Fatalf("Offset too large, %d > %d", offset, chunkSize)
|
||||||
|
}
|
||||||
|
err = suite.StorageDriver.WriteStream(filename, offset, 3*chunkSize, ioutil.NopCloser(bytes.NewReader(fullContents[offset:2*chunkSize])))
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
offset, err = suite.StorageDriver.ResumeWritePosition(filename)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
if offset > 2*chunkSize {
|
||||||
|
c.Fatalf("Offset too large, %d > %d", offset, 2*chunkSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = suite.StorageDriver.WriteStream(filename, offset, 3*chunkSize, ioutil.NopCloser(bytes.NewReader(fullContents[offset:])))
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
received, err := suite.StorageDriver.GetContent(filename)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
c.Assert(received, DeepEquals, fullContents)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *DriverSuite) TestReadStreamWithOffset(c *C) {
|
||||||
|
filename := randomString(32)
|
||||||
|
defer suite.StorageDriver.Delete(filename)
|
||||||
|
|
||||||
|
chunkSize := uint64(32)
|
||||||
|
|
||||||
|
contentsChunk1 := []byte(randomString(chunkSize))
|
||||||
|
contentsChunk2 := []byte(randomString(chunkSize))
|
||||||
|
contentsChunk3 := []byte(randomString(chunkSize))
|
||||||
|
|
||||||
|
err := suite.StorageDriver.PutContent(filename, append(append(contentsChunk1, contentsChunk2...), contentsChunk3...))
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
reader, err := suite.StorageDriver.ReadStream(filename, 0)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
defer reader.Close()
|
||||||
|
|
||||||
|
readContents, err := ioutil.ReadAll(reader)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
c.Assert(readContents, DeepEquals, append(append(contentsChunk1, contentsChunk2...), contentsChunk3...))
|
||||||
|
|
||||||
|
reader, err = suite.StorageDriver.ReadStream(filename, chunkSize)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
defer reader.Close()
|
||||||
|
|
||||||
|
readContents, err = ioutil.ReadAll(reader)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
c.Assert(readContents, DeepEquals, append(contentsChunk2, contentsChunk3...))
|
||||||
|
|
||||||
|
reader, err = suite.StorageDriver.ReadStream(filename, chunkSize*2)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
defer reader.Close()
|
||||||
|
|
||||||
|
readContents, err = ioutil.ReadAll(reader)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
c.Assert(readContents, DeepEquals, contentsChunk3)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *DriverSuite) TestReadNonexistentStream(c *C) {
|
||||||
|
filename := randomString(32)
|
||||||
|
_, err := suite.StorageDriver.ReadStream(filename, 0)
|
||||||
|
c.Assert(err, NotNil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *DriverSuite) TestList(c *C) {
|
||||||
|
rootDirectory := randomString(uint64(8 + rand.Intn(8)))
|
||||||
|
defer suite.StorageDriver.Delete(rootDirectory)
|
||||||
|
|
||||||
|
parentDirectory := rootDirectory + "/" + randomString(uint64(8+rand.Intn(8)))
|
||||||
|
childFiles := make([]string, 50)
|
||||||
|
for i := 0; i < len(childFiles); i++ {
|
||||||
|
childFile := parentDirectory + "/" + randomString(uint64(8+rand.Intn(8)))
|
||||||
|
childFiles[i] = childFile
|
||||||
|
err := suite.StorageDriver.PutContent(childFile, []byte(randomString(32)))
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
}
|
||||||
|
sort.Strings(childFiles)
|
||||||
|
|
||||||
|
keys, err := suite.StorageDriver.List(rootDirectory)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
c.Assert(keys, DeepEquals, []string{parentDirectory})
|
||||||
|
|
||||||
|
keys, err = suite.StorageDriver.List(parentDirectory)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
sort.Strings(keys)
|
||||||
|
c.Assert(keys, DeepEquals, childFiles)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *DriverSuite) TestMove(c *C) {
|
||||||
|
contents := []byte(randomString(32))
|
||||||
|
sourcePath := randomString(32)
|
||||||
|
destPath := randomString(32)
|
||||||
|
|
||||||
|
defer suite.StorageDriver.Delete(sourcePath)
|
||||||
|
defer suite.StorageDriver.Delete(destPath)
|
||||||
|
|
||||||
|
err := suite.StorageDriver.PutContent(sourcePath, contents)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
err = suite.StorageDriver.Move(sourcePath, destPath)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
received, err := suite.StorageDriver.GetContent(destPath)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
c.Assert(received, DeepEquals, contents)
|
||||||
|
|
||||||
|
_, err = suite.StorageDriver.GetContent(sourcePath)
|
||||||
|
c.Assert(err, NotNil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *DriverSuite) TestMoveNonexistent(c *C) {
|
||||||
|
sourcePath := randomString(32)
|
||||||
|
destPath := randomString(32)
|
||||||
|
|
||||||
|
err := suite.StorageDriver.Move(sourcePath, destPath)
|
||||||
|
c.Assert(err, NotNil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *DriverSuite) TestRemove(c *C) {
|
||||||
|
filename := randomString(32)
|
||||||
|
contents := []byte(randomString(32))
|
||||||
|
|
||||||
|
defer suite.StorageDriver.Delete(filename)
|
||||||
|
|
||||||
|
err := suite.StorageDriver.PutContent(filename, contents)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
err = suite.StorageDriver.Delete(filename)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
_, err = suite.StorageDriver.GetContent(filename)
|
||||||
|
c.Assert(err, NotNil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *DriverSuite) TestRemoveNonexistent(c *C) {
|
||||||
|
filename := randomString(32)
|
||||||
|
err := suite.StorageDriver.Delete(filename)
|
||||||
|
c.Assert(err, NotNil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *DriverSuite) TestRemoveFolder(c *C) {
|
||||||
|
dirname := randomString(32)
|
||||||
|
filename1 := randomString(32)
|
||||||
|
filename2 := randomString(32)
|
||||||
|
contents := []byte(randomString(32))
|
||||||
|
|
||||||
|
defer suite.StorageDriver.Delete(path.Join(dirname, filename1))
|
||||||
|
defer suite.StorageDriver.Delete(path.Join(dirname, filename2))
|
||||||
|
|
||||||
|
err := suite.StorageDriver.PutContent(path.Join(dirname, filename1), contents)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
err = suite.StorageDriver.PutContent(path.Join(dirname, filename2), contents)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
err = suite.StorageDriver.Delete(dirname)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
_, err = suite.StorageDriver.GetContent(path.Join(dirname, filename1))
|
||||||
|
c.Assert(err, NotNil)
|
||||||
|
|
||||||
|
_, err = suite.StorageDriver.GetContent(path.Join(dirname, filename2))
|
||||||
|
c.Assert(err, NotNil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *DriverSuite) writeReadCompare(c *C, filename string, contents, expected []byte) {
|
||||||
|
defer suite.StorageDriver.Delete(filename)
|
||||||
|
|
||||||
|
err := suite.StorageDriver.PutContent(filename, contents)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
readContents, err := suite.StorageDriver.GetContent(filename)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
c.Assert(readContents, DeepEquals, contents)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *DriverSuite) writeReadCompareStreams(c *C, filename string, contents, expected []byte) {
|
||||||
|
defer suite.StorageDriver.Delete(filename)
|
||||||
|
|
||||||
|
err := suite.StorageDriver.WriteStream(filename, 0, uint64(len(contents)), ioutil.NopCloser(bytes.NewReader(contents)))
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
reader, err := suite.StorageDriver.ReadStream(filename, 0)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
defer reader.Close()
|
||||||
|
|
||||||
|
readContents, err := ioutil.ReadAll(reader)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
c.Assert(readContents, DeepEquals, contents)
|
||||||
|
}
|
||||||
|
|
||||||
|
var pathChars = []byte("abcdefghijklmnopqrstuvwxyz")
|
||||||
|
|
||||||
|
func randomString(length uint64) string {
|
||||||
|
b := make([]byte, length)
|
||||||
|
for i := range b {
|
||||||
|
b[i] = pathChars[rand.Intn(len(pathChars))]
|
||||||
|
}
|
||||||
|
return string(b)
|
||||||
|
}
|
Loading…
Reference in a new issue