Merge pull request #820 from stevvooe/ng-storagedriver-updates

StorageDriver interface changes and cleanup
This commit is contained in:
Olivier Gambier 2014-12-08 12:48:15 -08:00
commit f3f70d151d
28 changed files with 999 additions and 324 deletions

View file

@ -21,8 +21,11 @@ test:
- test -z $(gofmt -s -l . | tee /dev/stderr)
- go vet ./...
- test -z $(golint ./... | tee /dev/stderr)
- go test -race -test.v ./...:
timeout: 600
- go test -test.v ./...
# Disabling the race detector due to massive memory usage.
# - go test -race -test.v ./...:
# timeout: 600
# TODO(stevvooe): The following is an attempt at using goveralls but it
# just doesn't work. goveralls requires a single profile file to be

View file

@ -91,7 +91,7 @@ func TestPush(t *testing.T) {
}
handler := testutil.NewHandler(append(blobRequestResponseMappings, testutil.RequestResponseMap{
testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: "PUT",
Route: "/v2/" + name + "/manifest/" + tag,
@ -184,7 +184,7 @@ func TestPull(t *testing.T) {
}
handler := testutil.NewHandler(append(blobRequestResponseMappings, testutil.RequestResponseMap{
testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: "GET",
Route: "/v2/" + name + "/manifest/" + tag,
@ -307,7 +307,7 @@ func TestPullResume(t *testing.T) {
for i := 0; i < 3; i++ {
layerRequestResponseMappings = append(layerRequestResponseMappings, testutil.RequestResponseMap{
testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: "GET",
Route: "/v2/" + name + "/manifest/" + tag,

View file

@ -1,3 +1,5 @@
// +build ignore
package main
import (

View file

@ -1,3 +1,5 @@
// +build ignore
package main
import (

View file

@ -1,3 +1,5 @@
// +build ignore
package main
import (

View file

@ -1,3 +1,5 @@
// +build ignore
package main
import (

View file

@ -15,7 +15,6 @@ import (
"github.com/docker/docker-registry/configuration"
_ "github.com/docker/docker-registry/storagedriver/filesystem"
_ "github.com/docker/docker-registry/storagedriver/inmemory"
_ "github.com/docker/docker-registry/storagedriver/s3"
)
func main() {

View file

@ -5,6 +5,7 @@ import (
"fmt"
"io"
"os"
"time"
"github.com/docker/docker-registry/storagedriver"
)
@ -18,6 +19,7 @@ type fileReader struct {
// identifying fields
path string
size int64 // size is the total layer size, must be set.
modtime time.Time
// mutable fields
rc io.ReadCloser // remote read closer
@ -28,16 +30,21 @@ type fileReader struct {
func newFileReader(driver storagedriver.StorageDriver, path string) (*fileReader, error) {
// Grab the size of the layer file, ensuring existence.
size, err := driver.CurrentSize(path)
fi, err := driver.Stat(path)
if err != nil {
return nil, err
}
if fi.IsDir() {
return nil, fmt.Errorf("cannot read a directory")
}
return &fileReader{
driver: driver,
path: path,
size: int64(size),
size: fi.Size(),
modtime: fi.ModTime(),
}, nil
}
@ -126,7 +133,7 @@ func (fr *fileReader) reader() (io.Reader, error) {
}
// If we don't have a reader, open one up.
rc, err := fr.driver.ReadStream(fr.path, uint64(fr.offset))
rc, err := fr.driver.ReadStream(fr.path, fr.offset)
if err != nil {
return nil, err

View file

@ -13,7 +13,6 @@ type layerReader struct {
name string // repo name of this layer
digest digest.Digest
createdAt time.Time
}
var _ Layer = &layerReader{}
@ -27,5 +26,5 @@ func (lrs *layerReader) Digest() digest.Digest {
}
func (lrs *layerReader) CreatedAt() time.Time {
return lrs.createdAt
return lrs.modtime
}

View file

@ -1,8 +1,6 @@
package storage
import (
"time"
"github.com/docker/docker-registry/digest"
"github.com/docker/docker-registry/storagedriver"
)
@ -55,11 +53,6 @@ func (ls *layerStore) Fetch(name string, digest digest.Digest) (Layer, error) {
fileReader: *fr,
name: name,
digest: digest,
// TODO(stevvooe): Storage backend does not support modification time
// queries yet. Layers "never" change, so just return the zero value
// plus a nano-second.
createdAt: (time.Time{}).Add(time.Nanosecond),
}, nil
}

View file

@ -107,9 +107,13 @@ func (luc *layerUploadController) Finish(size int64, digest digest.Digest) (Laye
return nil, err
}
if err := luc.writeLayer(fp, size, digest); err != nil {
if nn, err := luc.writeLayer(fp, digest); err != nil {
// Cleanup?
return nil, err
} else if nn != size {
// TODO(stevvooe): Short write. Will have to delete the location and
// report an error. This error needs to be reported to the client.
return nil, fmt.Errorf("short write writing layer")
}
// Yes! We have written some layer data. Let's make it visible. Link the
@ -281,19 +285,20 @@ func (luc *layerUploadController) validateLayer(fp layerFile, size int64, dgst d
return dgst, nil
}
// writeLayer actually writes the the layer file into its final destination.
// The layer should be validated before commencing the write.
func (luc *layerUploadController) writeLayer(fp layerFile, size int64, dgst digest.Digest) error {
// writeLayer actually writes the the layer file into its final destination,
// identified by dgst. The layer should be validated before commencing the
// write.
func (luc *layerUploadController) writeLayer(fp layerFile, dgst digest.Digest) (nn int64, err error) {
blobPath, err := luc.layerStore.pathMapper.path(blobPathSpec{
digest: dgst,
})
if err != nil {
return err
return 0, err
}
// Check for existence
if _, err := luc.layerStore.driver.CurrentSize(blobPath); err != nil {
if _, err := luc.layerStore.driver.Stat(blobPath); err != nil {
// TODO(stevvooe): This check is kind of problematic and very racy.
switch err := err.(type) {
case storagedriver.PathNotFoundError:
@ -303,22 +308,18 @@ func (luc *layerUploadController) writeLayer(fp layerFile, size int64, dgst dige
// content addressable and we should just use this to ensure we
// have it written. Although, we do need to verify that the
// content that is there is the correct length.
return err
return 0, err
}
}
// Seek our local layer file back now.
if _, err := fp.Seek(0, os.SEEK_SET); err != nil {
// Cleanup?
return err
return 0, err
}
// Okay: we can write the file to the blob store.
if err := luc.layerStore.driver.WriteStream(blobPath, 0, uint64(size), fp); err != nil {
return err
}
return nil
return luc.layerStore.driver.WriteStream(blobPath, 0, fp)
}
// linkLayer links a valid, written layer blob into the registry under the

View file

@ -22,12 +22,21 @@ func (ms *manifestStore) Exists(name, tag string) (bool, error) {
return false, err
}
size, err := ms.driver.CurrentSize(p)
fi, err := ms.driver.Stat(p)
if err != nil {
switch err.(type) {
case storagedriver.PathNotFoundError:
return false, nil
default:
return false, err
}
}
if size == 0 {
if fi.IsDir() {
return false, fmt.Errorf("unexpected directory at path: %v, name=%s tag=%s", p, name, tag)
}
if fi.Size() == 0 {
return false, nil
}

View file

@ -1,3 +1,5 @@
// +build ignore
// Package azure provides a storagedriver.StorageDriver implementation to
// store blobs in Microsoft Azure Blob Storage Service.
package azure
@ -103,7 +105,7 @@ func (d *Driver) PutContent(path string, contents []byte) error {
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset.
func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) {
func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) {
if ok, err := d.client.BlobExists(d.container, path); err != nil {
return nil, err
} else if !ok {
@ -115,7 +117,7 @@ func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) {
return nil, err
}
if offset >= size {
if offset >= int64(size) {
return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
}
@ -129,10 +131,10 @@ func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) {
// WriteStream stores the contents of the provided io.ReadCloser at a location
// designated by the given path.
func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error {
func (d *Driver) WriteStream(path string, offset, size int64, reader io.ReadCloser) error {
var (
lastBlockNum int
resumableOffset uint64
resumableOffset int64
blocks []azure.Block
)
@ -153,12 +155,12 @@ func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadClo
return fmt.Errorf("Cannot parse block name as number '%s': %s", lastBlock.Name, err.Error())
}
var totalSize uint64
var totalSize int64
for _, v := range parts.CommittedBlocks {
blocks = append(blocks, azure.Block{
Id: v.Name,
Status: azure.BlockStatusCommitted})
totalSize += uint64(v.Size)
totalSize += int64(v.Size)
}
// NOTE: Azure driver currently supports only append mode (resumable

View file

@ -1,3 +1,5 @@
// +build ignore
package azure
import (

View file

@ -4,7 +4,6 @@ import (
"fmt"
"github.com/docker/docker-registry/storagedriver"
"github.com/docker/docker-registry/storagedriver/ipc"
)
// driverFactories stores an internal mapping between storage driver names and their respective
@ -41,16 +40,23 @@ func Register(name string, factory StorageDriverFactory) {
func Create(name string, parameters map[string]string) (storagedriver.StorageDriver, error) {
driverFactory, ok := driverFactories[name]
if !ok {
// No registered StorageDriverFactory found, try ipc
driverClient, err := ipc.NewDriverClient(name, parameters)
if err != nil {
return nil, InvalidStorageDriverError{name}
}
err = driverClient.Start()
if err != nil {
return nil, err
}
return driverClient, nil
// NOTE(stevvooe): We are disabling storagedriver ipc for now, as the
// server and client need to be updated for the changed API calls and
// there were some problems libchan hanging. We'll phase this
// functionality back in over the next few weeks.
// No registered StorageDriverFactory found, try ipc
// driverClient, err := ipc.NewDriverClient(name, parameters)
// if err != nil {
// return nil, InvalidStorageDriverError{name}
// }
// err = driverClient.Start()
// if err != nil {
// return nil, err
// }
// return driverClient, nil
}
return driverFactory.Create(parameters)
}

79
storagedriver/fileinfo.go Normal file
View file

@ -0,0 +1,79 @@
package storagedriver
import "time"
// FileInfo returns information about a given path. Inspired by os.FileInfo,
// it elides the base name method for a full path instead.
type FileInfo interface {
// Path provides the full path of the target of this file info.
Path() string
// Size returns current length in bytes of the file. The return value can
// be used to write to the end of the file at path. The value is
// meaningless if IsDir returns true.
Size() int64
// ModTime returns the modification time for the file. For backends that
// don't have a modification time, the creation time should be returned.
ModTime() time.Time
// IsDir returns true if the path is a directory.
IsDir() bool
}
// NOTE(stevvooe): The next two types, FileInfoFields and FileInfoInternal
// should only be used by storagedriver implementations. They should moved to
// a "driver" package, similar to database/sql.
// FileInfoFields provides the exported fields for implementing FileInfo
// interface in storagedriver implementations. It should be used with
// InternalFileInfo.
type FileInfoFields struct {
// Path provides the full path of the target of this file info.
Path string
// Size is current length in bytes of the file. The value of this field
// can be used to write to the end of the file at path. The value is
// meaningless if IsDir is set to true.
Size int64
// ModTime returns the modification time for the file. For backends that
// don't have a modification time, the creation time should be returned.
ModTime time.Time
// IsDir returns true if the path is a directory.
IsDir bool
}
// FileInfoInternal implements the FileInfo interface. This should only be
// used by storagedriver implementations that don't have a specialized
// FileInfo type.
type FileInfoInternal struct {
FileInfoFields
}
var _ FileInfo = FileInfoInternal{}
var _ FileInfo = &FileInfoInternal{}
// Path provides the full path of the target of this file info.
func (fi FileInfoInternal) Path() string {
return fi.FileInfoFields.Path
}
// Size returns current length in bytes of the file. The return value can
// be used to write to the end of the file at path. The value is
// meaningless if IsDir returns true.
func (fi FileInfoInternal) Size() int64 {
return fi.FileInfoFields.Size
}
// ModTime returns the modification time for the file. For backends that
// don't have a modification time, the creation time should be returned.
func (fi FileInfoInternal) ModTime() time.Time {
return fi.FileInfoFields.ModTime
}
// IsDir returns true if the path is a directory.
func (fi FileInfoInternal) IsDir() bool {
return fi.FileInfoFields.IsDir
}

View file

@ -1,10 +1,13 @@
package filesystem
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"time"
"github.com/docker/docker-registry/storagedriver"
"github.com/docker/docker-registry/storagedriver/factory"
@ -49,43 +52,49 @@ func New(rootDirectory string) *Driver {
return &Driver{rootDirectory}
}
// subPath returns the absolute path of a key within the Driver's storage
func (d *Driver) subPath(subPath string) string {
return path.Join(d.rootDirectory, subPath)
}
// Implement the storagedriver.StorageDriver interface
// GetContent retrieves the content stored at "path" as a []byte.
func (d *Driver) GetContent(path string) ([]byte, error) {
contents, err := ioutil.ReadFile(d.subPath(path))
rc, err := d.ReadStream(path, 0)
if err != nil {
return nil, storagedriver.PathNotFoundError{Path: path}
return nil, err
}
return contents, nil
defer rc.Close()
p, err := ioutil.ReadAll(rc)
if err != nil {
return nil, err
}
return p, nil
}
// PutContent stores the []byte content at a location designated by "path".
func (d *Driver) PutContent(subPath string, contents []byte) error {
fullPath := d.subPath(subPath)
parentDir := path.Dir(fullPath)
err := os.MkdirAll(parentDir, 0755)
if err != nil {
if _, err := d.WriteStream(subPath, 0, bytes.NewReader(contents)); err != nil {
return err
}
err = ioutil.WriteFile(fullPath, contents, 0644)
return err
return os.Truncate(d.fullPath(subPath), int64(len(contents)))
}
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset.
func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) {
file, err := os.OpenFile(d.subPath(path), os.O_RDONLY, 0644)
func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) {
if offset < 0 {
return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
}
file, err := os.OpenFile(d.fullPath(path), os.O_RDONLY, 0644)
if err != nil {
if os.IsNotExist(err) {
return nil, storagedriver.PathNotFoundError{Path: path}
}
return nil, err
}
seekPos, err := file.Seek(int64(offset), os.SEEK_SET)
if err != nil {
file.Close()
@ -98,79 +107,64 @@ func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) {
return file, nil
}
// WriteStream stores the contents of the provided io.ReadCloser at a location
// WriteStream stores the contents of the provided io.Reader at a location
// designated by the given path.
func (d *Driver) WriteStream(subPath string, offset, size uint64, reader io.ReadCloser) error {
defer reader.Close()
resumableOffset, err := d.CurrentSize(subPath)
if _, pathNotFound := err.(storagedriver.PathNotFoundError); err != nil && !pathNotFound {
return err
func (d *Driver) WriteStream(subPath string, offset int64, reader io.Reader) (nn int64, err error) {
if offset < 0 {
return 0, storagedriver.InvalidOffsetError{Path: subPath, Offset: offset}
}
if offset > resumableOffset {
return storagedriver.InvalidOffsetError{Path: subPath, Offset: offset}
}
// TODO(stevvooe): This needs to be a requirement.
// if !path.IsAbs(subPath) {
// return fmt.Errorf("absolute path required: %q", subPath)
// }
fullPath := d.subPath(subPath)
fullPath := d.fullPath(subPath)
parentDir := path.Dir(fullPath)
err = os.MkdirAll(parentDir, 0755)
if err := os.MkdirAll(parentDir, 0755); err != nil {
return 0, err
}
fp, err := os.OpenFile(fullPath, os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
return err
}
var file *os.File
if offset == 0 {
file, err = os.Create(fullPath)
} else {
file, err = os.OpenFile(fullPath, os.O_WRONLY|os.O_APPEND, 0)
// TODO(stevvooe): A few missing conditions in storage driver:
// 1. What if the path is already a directory?
// 2. Should number 1 be exposed explicitly in storagedriver?
// 2. Can this path not exist, even if we create above?
return 0, err
}
defer fp.Close()
nn, err = fp.Seek(offset, os.SEEK_SET)
if err != nil {
return err
return 0, err
}
defer file.Close()
buf := make([]byte, 32*1024)
for {
bytesRead, er := reader.Read(buf)
if bytesRead > 0 {
bytesWritten, ew := file.WriteAt(buf[0:bytesRead], int64(offset))
if bytesWritten > 0 {
offset += uint64(bytesWritten)
if nn != offset {
return 0, fmt.Errorf("bad seek to %v, expected %v in fp=%v", offset, nn, fp)
}
if ew != nil {
err = ew
break
}
if bytesRead != bytesWritten {
err = io.ErrShortWrite
break
}
}
if er == io.EOF {
break
}
if er != nil {
err = er
break
}
}
return err
return io.Copy(fp, reader)
}
// CurrentSize retrieves the curernt size in bytes of the object at the given
// path.
func (d *Driver) CurrentSize(subPath string) (uint64, error) {
fullPath := d.subPath(subPath)
// Stat retrieves the FileInfo for the given path, including the current size
// in bytes and the creation time.
func (d *Driver) Stat(subPath string) (storagedriver.FileInfo, error) {
fullPath := d.fullPath(subPath)
fileInfo, err := os.Stat(fullPath)
if err != nil && !os.IsNotExist(err) {
return 0, err
} else if err != nil {
return 0, storagedriver.PathNotFoundError{Path: subPath}
fi, err := os.Stat(fullPath)
if err != nil {
if os.IsNotExist(err) {
return nil, storagedriver.PathNotFoundError{Path: subPath}
}
return uint64(fileInfo.Size()), nil
return nil, err
}
return fileInfo{
path: subPath,
FileInfo: fi,
}, nil
}
// List returns a list of the objects that are direct descendants of the given
@ -179,7 +173,7 @@ func (d *Driver) List(subPath string) ([]string, error) {
if subPath[len(subPath)-1] != '/' {
subPath += "/"
}
fullPath := d.subPath(subPath)
fullPath := d.fullPath(subPath)
dir, err := os.Open(fullPath)
if err != nil {
@ -202,8 +196,8 @@ func (d *Driver) List(subPath string) ([]string, error) {
// Move moves an object stored at sourcePath to destPath, removing the original
// object.
func (d *Driver) Move(sourcePath string, destPath string) error {
source := d.subPath(sourcePath)
dest := d.subPath(destPath)
source := d.fullPath(sourcePath)
dest := d.fullPath(destPath)
if _, err := os.Stat(source); os.IsNotExist(err) {
return storagedriver.PathNotFoundError{Path: sourcePath}
@ -215,7 +209,7 @@ func (d *Driver) Move(sourcePath string, destPath string) error {
// Delete recursively deletes all objects stored at "path" and its subpaths.
func (d *Driver) Delete(subPath string) error {
fullPath := d.subPath(subPath)
fullPath := d.fullPath(subPath)
_, err := os.Stat(fullPath)
if err != nil && !os.IsNotExist(err) {
@ -227,3 +221,42 @@ func (d *Driver) Delete(subPath string) error {
err = os.RemoveAll(fullPath)
return err
}
// fullPath returns the absolute path of a key within the Driver's storage.
func (d *Driver) fullPath(subPath string) string {
return path.Join(d.rootDirectory, subPath)
}
type fileInfo struct {
os.FileInfo
path string
}
var _ storagedriver.FileInfo = fileInfo{}
// Path provides the full path of the target of this file info.
func (fi fileInfo) Path() string {
return fi.path
}
// Size returns current length in bytes of the file. The return value can
// be used to write to the end of the file at path. The value is
// meaningless if IsDir returns true.
func (fi fileInfo) Size() int64 {
if fi.IsDir() {
return 0
}
return fi.FileInfo.Size()
}
// ModTime returns the modification time for the file. For backends that
// don't have a modification time, the creation time should be returned.
func (fi fileInfo) ModTime() time.Time {
return fi.FileInfo.ModTime()
}
// IsDir returns true if the path is a directory.
func (fi fileInfo) IsDir() bool {
return fi.FileInfo.IsDir()
}

View file

@ -1,6 +1,7 @@
package filesystem
import (
"io/ioutil"
"os"
"testing"
@ -13,12 +14,16 @@ import (
func Test(t *testing.T) { TestingT(t) }
func init() {
rootDirectory := "/tmp/driver"
os.RemoveAll(rootDirectory)
filesystemDriverConstructor := func() (storagedriver.StorageDriver, error) {
return New(rootDirectory), nil
root, err := ioutil.TempDir("", "driver-")
if err != nil {
panic(err)
}
testsuites.RegisterInProcessSuite(filesystemDriverConstructor, testsuites.NeverSkip)
testsuites.RegisterIPCSuite(driverName, map[string]string{"rootdirectory": rootDirectory}, testsuites.NeverSkip)
defer os.Remove(root)
testsuites.RegisterInProcessSuite(func() (storagedriver.StorageDriver, error) {
return New(root), nil
}, testsuites.NeverSkip)
// BUG(stevvooe): IPC is broken so we're disabling for now. Will revisit later.
// testsuites.RegisterIPCSuite(driverName, map[string]string{"rootdirectory": root}, testsuites.NeverSkip)
}

View file

@ -5,9 +5,9 @@ import (
"fmt"
"io"
"io/ioutil"
"regexp"
"strings"
"sync"
"time"
"github.com/docker/docker-registry/storagedriver"
"github.com/docker/docker-registry/storagedriver/factory"
@ -29,13 +29,18 @@ func (factory *inMemoryDriverFactory) Create(parameters map[string]string) (stor
// Driver is a storagedriver.StorageDriver implementation backed by a local map.
// Intended solely for example and testing purposes.
type Driver struct {
storage map[string][]byte
root *dir
mutex sync.RWMutex
}
// New constructs a new Driver.
func New() *Driver {
return &Driver{storage: make(map[string][]byte)}
return &Driver{root: &dir{
common: common{
p: "/",
mod: time.Now(),
},
}}
}
// Implement the storagedriver.StorageDriver interface.
@ -44,106 +49,141 @@ func New() *Driver {
func (d *Driver) GetContent(path string) ([]byte, error) {
d.mutex.RLock()
defer d.mutex.RUnlock()
contents, ok := d.storage[path]
if !ok {
return nil, storagedriver.PathNotFoundError{Path: path}
rc, err := d.ReadStream(path, 0)
if err != nil {
return nil, err
}
return contents, nil
defer rc.Close()
return ioutil.ReadAll(rc)
}
// PutContent stores the []byte content at a location designated by "path".
func (d *Driver) PutContent(path string, contents []byte) error {
func (d *Driver) PutContent(p string, contents []byte) error {
d.mutex.Lock()
defer d.mutex.Unlock()
d.storage[path] = contents
f, err := d.root.mkfile(p)
if err != nil {
// TODO(stevvooe): Again, we need to clarify when this is not a
// directory in StorageDriver API.
return fmt.Errorf("not a file")
}
f.truncate()
f.WriteAt(contents, 0)
return nil
}
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset.
func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) {
func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) {
d.mutex.RLock()
defer d.mutex.RUnlock()
contents, err := d.GetContent(path)
if err != nil {
return nil, err
} else if len(contents) <= int(offset) {
if offset < 0 {
return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
}
src := contents[offset:]
buf := make([]byte, len(src))
copy(buf, src)
return ioutil.NopCloser(bytes.NewReader(buf)), nil
path = d.normalize(path)
found := d.root.find(path)
if found.path() != path {
return nil, storagedriver.PathNotFoundError{Path: path}
}
if found.isdir() {
return nil, fmt.Errorf("%q is a directory", path)
}
return ioutil.NopCloser(found.(*file).sectionReader(offset)), nil
}
// WriteStream stores the contents of the provided io.ReadCloser at a location
// designated by the given path.
func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error {
defer reader.Close()
d.mutex.RLock()
defer d.mutex.RUnlock()
func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (nn int64, err error) {
d.mutex.Lock()
defer d.mutex.Unlock()
resumableOffset, err := d.CurrentSize(path)
if offset < 0 {
return 0, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
}
normalized := d.normalize(path)
f, err := d.root.mkfile(normalized)
if err != nil {
return err
return 0, fmt.Errorf("not a file")
}
if offset > resumableOffset {
return storagedriver.InvalidOffsetError{Path: path, Offset: offset}
}
var buf bytes.Buffer
contents, err := ioutil.ReadAll(reader)
nn, err = buf.ReadFrom(reader)
if err != nil {
return err
// TODO(stevvooe): This condition is odd and we may need to clarify:
// we've read nn bytes from reader but have written nothing to the
// backend. What is the correct return value? Really, the caller needs
// to know that the reader has been advanced and reattempting the
// operation is incorrect.
return nn, err
}
if offset > 0 {
contents = append(d.storage[path][0:offset], contents...)
}
d.storage[path] = contents
return nil
f.WriteAt(buf.Bytes(), offset)
return nn, err
}
// CurrentSize retrieves the curernt size in bytes of the object at the given
// path.
func (d *Driver) CurrentSize(path string) (uint64, error) {
// Stat returns info about the provided path.
func (d *Driver) Stat(path string) (storagedriver.FileInfo, error) {
d.mutex.RLock()
defer d.mutex.RUnlock()
contents, ok := d.storage[path]
if !ok {
return 0, nil
normalized := d.normalize(path)
found := d.root.find(path)
if found.path() != normalized {
return nil, storagedriver.PathNotFoundError{Path: path}
}
return uint64(len(contents)), nil
fi := storagedriver.FileInfoFields{
Path: path,
IsDir: found.isdir(),
ModTime: found.modtime(),
}
if !fi.IsDir {
fi.Size = int64(len(found.(*file).data))
}
return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
}
// List returns a list of the objects that are direct descendants of the given
// path.
func (d *Driver) List(path string) ([]string, error) {
if path[len(path)-1] != '/' {
path += "/"
normalized := d.normalize(path)
found := d.root.find(normalized)
if !found.isdir() {
return nil, fmt.Errorf("not a directory") // TODO(stevvooe): Need error type for this...
}
subPathMatcher, err := regexp.Compile(fmt.Sprintf("^%s[^/]+", path))
entries, err := found.(*dir).list(normalized)
if err != nil {
switch err {
case errNotExists:
return nil, storagedriver.PathNotFoundError{Path: path}
case errIsNotDir:
return nil, fmt.Errorf("not a directory")
default:
return nil, err
}
d.mutex.RLock()
defer d.mutex.RUnlock()
// we use map to collect unique keys
keySet := make(map[string]struct{})
for k := range d.storage {
if key := subPathMatcher.FindString(k); key != "" {
keySet[key] = struct{}{}
}
}
keys := make([]string, 0, len(keySet))
for k := range keySet {
keys = append(keys, k)
}
return keys, nil
return entries, nil
}
// Move moves an object stored at sourcePath to destPath, removing the original
@ -151,32 +191,37 @@ func (d *Driver) List(path string) ([]string, error) {
func (d *Driver) Move(sourcePath string, destPath string) error {
d.mutex.Lock()
defer d.mutex.Unlock()
contents, ok := d.storage[sourcePath]
if !ok {
return storagedriver.PathNotFoundError{Path: sourcePath}
normalizedSrc, normalizedDst := d.normalize(sourcePath), d.normalize(destPath)
err := d.root.move(normalizedSrc, normalizedDst)
switch err {
case errNotExists:
return storagedriver.PathNotFoundError{Path: destPath}
default:
return err
}
d.storage[destPath] = contents
delete(d.storage, sourcePath)
return nil
}
// Delete recursively deletes all objects stored at "path" and its subpaths.
func (d *Driver) Delete(path string) error {
d.mutex.Lock()
defer d.mutex.Unlock()
var subPaths []string
for k := range d.storage {
if strings.HasPrefix(k, path) {
subPaths = append(subPaths, k)
}
}
if len(subPaths) == 0 {
normalized := d.normalize(path)
err := d.root.delete(normalized)
switch err {
case errNotExists:
return storagedriver.PathNotFoundError{Path: path}
default:
return err
}
for _, subPath := range subPaths {
delete(d.storage, subPath)
}
return nil
}
func (d *Driver) normalize(p string) string {
if !strings.HasPrefix(p, "/") {
p = "/" + p // Ghetto path absolution.
}
return p
}

View file

@ -17,5 +17,8 @@ func init() {
return New(), nil
}
testsuites.RegisterInProcessSuite(inmemoryDriverConstructor, testsuites.NeverSkip)
testsuites.RegisterIPCSuite(driverName, nil, testsuites.NeverSkip)
// BUG(stevvooe): Disable flaky IPC tests for now when we can troubleshoot
// the problems with libchan.
// testsuites.RegisterIPCSuite(driverName, nil, testsuites.NeverSkip)
}

View file

@ -0,0 +1,329 @@
package inmemory
import (
"fmt"
"io"
"path"
"sort"
"strings"
"time"
)
var (
errExists = fmt.Errorf("exists")
errNotExists = fmt.Errorf("exists")
errIsNotDir = fmt.Errorf("notdir")
errIsDir = fmt.Errorf("isdir")
)
type node interface {
name() string
path() string
isdir() bool
modtime() time.Time
}
// dir is the central type for the memory-based storagedriver. All operations
// are dispatched from a root dir.
type dir struct {
common
// TODO(stevvooe): Use sorted slice + search.
children map[string]node
}
var _ node = &dir{}
func (d *dir) isdir() bool {
return true
}
// add places the node n into dir d.
func (d *dir) add(n node) {
if d.children == nil {
d.children = make(map[string]node)
}
d.children[n.name()] = n
d.mod = time.Now()
}
// find searches for the node, given path q in dir. If the node is found, it
// will be returned. If the node is not found, the closet existing parent. If
// the node is found, the returned (node).path() will match q.
func (d *dir) find(q string) node {
q = strings.Trim(q, "/")
i := strings.Index(q, "/")
if q == "" {
return d
}
if i == 0 {
panic("shouldn't happen, no root paths")
}
var component string
if i < 0 {
// No more path components
component = q
} else {
component = q[:i]
}
child, ok := d.children[component]
if !ok {
// Node was not found. Return p and the current node.
return d
}
if child.isdir() {
// traverse down!
q = q[i+1:]
return child.(*dir).find(q)
}
return child
}
func (d *dir) list(p string) ([]string, error) {
n := d.find(p)
if n.path() != p {
return nil, errNotExists
}
if !n.isdir() {
return nil, errIsNotDir
}
var children []string
for _, child := range n.(*dir).children {
children = append(children, child.path())
}
sort.Strings(children)
return children, nil
}
// mkfile or return the existing one. returns an error if it exists and is a
// directory. Essentially, this is open or create.
func (d *dir) mkfile(p string) (*file, error) {
n := d.find(p)
if n.path() == p {
if n.isdir() {
return nil, errIsDir
}
return n.(*file), nil
}
dirpath, filename := path.Split(p)
// Make any non-existent directories
n, err := d.mkdirs(dirpath)
if err != nil {
return nil, err
}
dd := n.(*dir)
n = &file{
common: common{
p: path.Join(dd.path(), filename),
mod: time.Now(),
},
}
dd.add(n)
return n.(*file), nil
}
// mkdirs creates any missing directory entries in p and returns the result.
func (d *dir) mkdirs(p string) (*dir, error) {
if p == "" {
p = "/"
}
n := d.find(p)
if !n.isdir() {
// Found something there
return nil, errIsNotDir
}
if n.path() == p {
return n.(*dir), nil
}
dd := n.(*dir)
relative := strings.Trim(strings.TrimPrefix(p, n.path()), "/")
if relative == "" {
return dd, nil
}
components := strings.Split(relative, "/")
for _, component := range components {
d, err := dd.mkdir(component)
if err != nil {
// This should actually never happen, since there are no children.
return nil, err
}
dd = d
}
return dd, nil
}
// mkdir creates a child directory under d with the given name.
func (d *dir) mkdir(name string) (*dir, error) {
if name == "" {
return nil, fmt.Errorf("invalid dirname")
}
_, ok := d.children[name]
if ok {
return nil, errExists
}
child := &dir{
common: common{
p: path.Join(d.path(), name),
mod: time.Now(),
},
}
d.add(child)
d.mod = time.Now()
return child, nil
}
func (d *dir) move(src, dst string) error {
dstDirname, _ := path.Split(dst)
dp, err := d.mkdirs(dstDirname)
if err != nil {
return err
}
srcDirname, srcFilename := path.Split(src)
sp := d.find(srcDirname)
if sp.path() != srcDirname {
return errNotExists
}
s, ok := sp.(*dir).children[srcFilename]
if !ok {
return errNotExists
}
delete(sp.(*dir).children, srcFilename)
switch n := s.(type) {
case *dir:
n.p = dst
case *file:
n.p = dst
}
dp.add(s)
return nil
}
func (d *dir) delete(p string) error {
dirname, filename := path.Split(p)
parent := d.find(dirname)
if dirname != parent.path() {
return errNotExists
}
if _, ok := parent.(*dir).children[filename]; !ok {
return errNotExists
}
delete(parent.(*dir).children, filename)
return nil
}
// dump outputs a primitive directory structure to stdout.
func (d *dir) dump(indent string) {
fmt.Println(indent, d.name()+"/")
for _, child := range d.children {
if child.isdir() {
child.(*dir).dump(indent + "\t")
} else {
fmt.Println(indent, child.name())
}
}
}
func (d *dir) String() string {
return fmt.Sprintf("&dir{path: %v, children: %v}", d.p, d.children)
}
// file stores actual data in the fs tree. It acts like an open, seekable file
// where operations are conducted through ReadAt and WriteAt. Use it with
// SectionReader for the best effect.
type file struct {
common
data []byte
}
var _ node = &file{}
func (f *file) isdir() bool {
return false
}
func (f *file) truncate() {
f.data = f.data[:0]
}
func (f *file) sectionReader(offset int64) io.Reader {
return io.NewSectionReader(f, offset, int64(len(f.data))-offset)
}
func (f *file) ReadAt(p []byte, offset int64) (n int, err error) {
return copy(p, f.data[offset:]), nil
}
func (f *file) WriteAt(p []byte, offset int64) (n int, err error) {
if len(f.data) > 0 && offset >= int64(len(f.data)) {
// Extend missing region with a zero pad, while also preallocating out to size of p.
pad := offset - int64(len(f.data))
size := len(p) + int(pad)
f.data = append(f.data, make([]byte, pad, size)...)
}
f.data = append(f.data, p...)
return len(p), nil
}
func (f *file) String() string {
return fmt.Sprintf("&file{path: %q}", f.p)
}
// common provides shared fields and methods for node implementations.
type common struct {
p string
mod time.Time
}
func (c *common) name() string {
_, name := path.Split(c.p)
return name
}
func (c *common) path() string {
return c.p
}
func (c *common) modtime() time.Time {
return c.mod
}

View file

@ -1,3 +1,5 @@
// +build ignore
package ipc
import (
@ -234,7 +236,7 @@ func (driver *StorageDriverClient) PutContent(path string, contents []byte) erro
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset.
func (driver *StorageDriverClient) ReadStream(path string, offset uint64) (io.ReadCloser, error) {
func (driver *StorageDriverClient) ReadStream(path string, offset int64) (io.ReadCloser, error) {
if err := driver.exited(); err != nil {
return nil, err
}
@ -261,7 +263,7 @@ func (driver *StorageDriverClient) ReadStream(path string, offset uint64) (io.Re
// WriteStream stores the contents of the provided io.ReadCloser at a location
// designated by the given path.
func (driver *StorageDriverClient) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error {
func (driver *StorageDriverClient) WriteStream(path string, offset, size int64, reader io.ReadCloser) error {
if err := driver.exited(); err != nil {
return err
}

View file

@ -1,3 +1,5 @@
// +build ignore
package ipc
import (

View file

@ -1,3 +1,5 @@
// +build ignore
package ipc
import (
@ -100,7 +102,7 @@ func handleRequest(driver storagedriver.StorageDriver, request Request) {
case "ReadStream":
path, _ := request.Parameters["Path"].(string)
// Depending on serialization method, Offset may be convereted to any int/uint type
offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(uint64(0))).Uint()
offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(int64(0))).Int()
reader, err := driver.ReadStream(path, offset)
var response ReadStreamResponse
if err != nil {
@ -115,9 +117,9 @@ func handleRequest(driver storagedriver.StorageDriver, request Request) {
case "WriteStream":
path, _ := request.Parameters["Path"].(string)
// Depending on serialization method, Offset may be convereted to any int/uint type
offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(uint64(0))).Uint()
offset := reflect.ValueOf(request.Parameters["Offset"]).Convert(reflect.TypeOf(int64(0))).Int()
// Depending on serialization method, Size may be convereted to any int/uint type
size := reflect.ValueOf(request.Parameters["Size"]).Convert(reflect.TypeOf(uint64(0))).Uint()
size := reflect.ValueOf(request.Parameters["Size"]).Convert(reflect.TypeOf(int64(0))).Int()
reader, _ := request.Parameters["Reader"].(io.ReadCloser)
err := driver.WriteStream(path, offset, size, reader)
response := WriteStreamResponse{

View file

@ -1,3 +1,5 @@
// +build ignore
package s3
import (
@ -17,7 +19,7 @@ const driverName = "s3"
// minChunkSize defines the minimum multipart upload chunk size
// S3 API requires multipart upload chunks to be at least 5MB
const minChunkSize = uint64(5 * 1024 * 1024)
const minChunkSize = 5 * 1024 * 1024
// listPartsMax is the largest amount of parts you can request from S3
const listPartsMax = 1000
@ -120,9 +122,9 @@ func (d *Driver) PutContent(path string, contents []byte) error {
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset.
func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) {
func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) {
headers := make(http.Header)
headers.Add("Range", "bytes="+strconv.FormatUint(offset, 10)+"-")
headers.Add("Range", "bytes="+strconv.FormatInt(offset, 10)+"-")
resp, err := d.Bucket.GetResponseWithHeaders(path, headers)
if err != nil {
@ -133,22 +135,22 @@ func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) {
// WriteStream stores the contents of the provided io.ReadCloser at a location
// designated by the given path.
func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error {
func (d *Driver) WriteStream(path string, offset, size int64, reader io.ReadCloser) error {
defer reader.Close()
chunkSize := minChunkSize
chunkSize := int64(minChunkSize)
for size/chunkSize >= listPartsMax {
chunkSize *= 2
}
partNumber := 1
totalRead := uint64(0)
var totalRead int64
multi, parts, err := d.getAllParts(path)
if err != nil {
return err
}
if (offset) > uint64(len(parts))*chunkSize || (offset < size && offset%chunkSize != 0) {
if (offset) > int64(len(parts))*chunkSize || (offset < size && offset%chunkSize != 0) {
return storagedriver.InvalidOffsetError{Path: path, Offset: offset}
}
@ -161,11 +163,11 @@ func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadClo
buf := make([]byte, chunkSize)
for {
bytesRead, err := io.ReadFull(reader, buf)
totalRead += uint64(bytesRead)
totalRead += int64(bytesRead)
if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
return err
} else if (uint64(bytesRead) < chunkSize) && totalRead != size {
} else if (int64(bytesRead) < chunkSize) && totalRead != size {
break
} else {
part, err := multi.PutPart(int(partNumber), bytes.NewReader(buf[0:bytesRead]))

View file

@ -1,3 +1,5 @@
// +build ignore
package s3
import (

View file

@ -44,7 +44,7 @@ type StorageDriver interface {
// ReadStream retrieves an io.ReadCloser for the content stored at "path"
// with a given byte offset.
// May be used to resume reading a stream by providing a nonzero offset.
ReadStream(path string, offset uint64) (io.ReadCloser, error)
ReadStream(path string, offset int64) (io.ReadCloser, error)
// WriteStream stores the contents of the provided io.ReadCloser at a
// location designated by the given path.
@ -52,12 +52,11 @@ type StorageDriver interface {
// "size" bytes.
// May be used to resume writing a stream by providing a nonzero offset.
// The offset must be no larger than the CurrentSize for this path.
WriteStream(path string, offset, size uint64, readCloser io.ReadCloser) error
WriteStream(path string, offset int64, reader io.Reader) (nn int64, err error)
// CurrentSize retrieves the curernt size in bytes of the object at the
// given path.
// It should be safe to read or write anywhere up to this point.
CurrentSize(path string) (uint64, error)
// Stat retrieves the FileInfo for the given path, including the current
// size in bytes and the creation time.
Stat(path string) (FileInfo, error)
// List returns a list of the objects that are direct descendants of the
//given path.
@ -86,7 +85,7 @@ func (err PathNotFoundError) Error() string {
// invalid offset.
type InvalidOffsetError struct {
Path string
Offset uint64
Offset int64
}
func (err InvalidOffsetError) Error() string {

View file

@ -2,15 +2,17 @@ package testsuites
import (
"bytes"
"io"
"io/ioutil"
"math/rand"
"os"
"path"
"sort"
"sync"
"testing"
"time"
"github.com/docker/docker-registry/storagedriver"
"github.com/docker/docker-registry/storagedriver/ipc"
"gopkg.in/check.v1"
)
@ -30,29 +32,34 @@ func RegisterInProcessSuite(driverConstructor DriverConstructor, skipCheck SkipC
// RegisterIPCSuite registers a storage driver test suite which runs the named
// driver as a child process with the given parameters.
func RegisterIPCSuite(driverName string, ipcParams map[string]string, skipCheck SkipCheck) {
suite := &DriverSuite{
Constructor: func() (storagedriver.StorageDriver, error) {
d, err := ipc.NewDriverClient(driverName, ipcParams)
if err != nil {
return nil, err
}
err = d.Start()
if err != nil {
return nil, err
}
return d, nil
},
SkipCheck: skipCheck,
}
suite.Teardown = func() error {
if suite.StorageDriver == nil {
return nil
}
panic("ipc testing is disabled for now")
driverClient := suite.StorageDriver.(*ipc.StorageDriverClient)
return driverClient.Stop()
}
check.Suite(suite)
// NOTE(stevvooe): IPC testing is disabled for now. Uncomment the code
// block before and remove the panic when we phase it back in.
// suite := &DriverSuite{
// Constructor: func() (storagedriver.StorageDriver, error) {
// d, err := ipc.NewDriverClient(driverName, ipcParams)
// if err != nil {
// return nil, err
// }
// err = d.Start()
// if err != nil {
// return nil, err
// }
// return d, nil
// },
// SkipCheck: skipCheck,
// }
// suite.Teardown = func() error {
// if suite.StorageDriver == nil {
// return nil
// }
// driverClient := suite.StorageDriver.(*ipc.StorageDriverClient)
// return driverClient.Stop()
// }
// check.Suite(suite)
}
// SkipCheck is a function used to determine if a test suite should be skipped.
@ -167,52 +174,13 @@ func (suite *DriverSuite) TestWriteReadStreams4(c *check.C) {
suite.writeReadCompareStreams(c, filename, contents)
}
// TestContinueStreamAppend tests that a stream write can be appended to without
// corrupting the data.
func (suite *DriverSuite) TestContinueStreamAppend(c *check.C) {
filename := randomString(32)
defer suite.StorageDriver.Delete(filename)
chunkSize := uint64(10 * 1024 * 1024)
contentsChunk1 := []byte(randomString(chunkSize))
contentsChunk2 := []byte(randomString(chunkSize))
contentsChunk3 := []byte(randomString(chunkSize))
fullContents := append(append(contentsChunk1, contentsChunk2...), contentsChunk3...)
err := suite.StorageDriver.WriteStream(filename, 0, 3*chunkSize, ioutil.NopCloser(bytes.NewReader(contentsChunk1)))
c.Assert(err, check.IsNil)
offset, err := suite.StorageDriver.CurrentSize(filename)
c.Assert(err, check.IsNil)
if offset > chunkSize {
c.Fatalf("Offset too large, %d > %d", offset, chunkSize)
}
err = suite.StorageDriver.WriteStream(filename, offset, 3*chunkSize, ioutil.NopCloser(bytes.NewReader(fullContents[offset:2*chunkSize])))
c.Assert(err, check.IsNil)
offset, err = suite.StorageDriver.CurrentSize(filename)
c.Assert(err, check.IsNil)
if offset > 2*chunkSize {
c.Fatalf("Offset too large, %d > %d", offset, 2*chunkSize)
}
err = suite.StorageDriver.WriteStream(filename, offset, 3*chunkSize, ioutil.NopCloser(bytes.NewReader(fullContents[offset:])))
c.Assert(err, check.IsNil)
received, err := suite.StorageDriver.GetContent(filename)
c.Assert(err, check.IsNil)
c.Assert(received, check.DeepEquals, fullContents)
}
// TestReadStreamWithOffset tests that the appropriate data is streamed when
// reading with a given offset.
func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) {
filename := randomString(32)
defer suite.StorageDriver.Delete(filename)
chunkSize := uint64(32)
chunkSize := int64(32)
contentsChunk1 := []byte(randomString(chunkSize))
contentsChunk2 := []byte(randomString(chunkSize))
@ -245,8 +213,125 @@ func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) {
readContents, err = ioutil.ReadAll(reader)
c.Assert(err, check.IsNil)
c.Assert(readContents, check.DeepEquals, contentsChunk3)
// Ensure we get invalid offest for negative offsets.
reader, err = suite.StorageDriver.ReadStream(filename, -1)
c.Assert(err, check.FitsTypeOf, storagedriver.InvalidOffsetError{})
c.Assert(err.(storagedriver.InvalidOffsetError).Offset, check.Equals, int64(-1))
c.Assert(err.(storagedriver.InvalidOffsetError).Path, check.Equals, filename)
c.Assert(reader, check.IsNil)
// Read past the end of the content and make sure we get a reader that
// returns 0 bytes and io.EOF
reader, err = suite.StorageDriver.ReadStream(filename, chunkSize*3)
c.Assert(err, check.IsNil)
defer reader.Close()
buf := make([]byte, chunkSize)
n, err := reader.Read(buf)
c.Assert(err, check.Equals, io.EOF)
c.Assert(n, check.Equals, 0)
// Check the N-1 boundary condition, ensuring we get 1 byte then io.EOF.
reader, err = suite.StorageDriver.ReadStream(filename, chunkSize*3-1)
c.Assert(err, check.IsNil)
defer reader.Close()
n, err = reader.Read(buf)
c.Assert(n, check.Equals, 1)
// We don't care whether the io.EOF comes on the this read or the first
// zero read, but the only error acceptable here is io.EOF.
if err != nil {
c.Assert(err, check.Equals, io.EOF)
}
// Any more reads should result in zero bytes and io.EOF
n, err = reader.Read(buf)
c.Assert(n, check.Equals, 0)
c.Assert(err, check.Equals, io.EOF)
}
// TestContinueStreamAppend tests that a stream write can be appended to without
// corrupting the data.
func (suite *DriverSuite) TestContinueStreamAppend(c *check.C) {
filename := randomString(32)
defer suite.StorageDriver.Delete(filename)
chunkSize := int64(10 * 1024 * 1024)
contentsChunk1 := []byte(randomString(chunkSize))
contentsChunk2 := []byte(randomString(chunkSize))
contentsChunk3 := []byte(randomString(chunkSize))
contentsChunk4 := []byte(randomString(chunkSize))
zeroChunk := make([]byte, int64(chunkSize))
fullContents := append(append(contentsChunk1, contentsChunk2...), contentsChunk3...)
nn, err := suite.StorageDriver.WriteStream(filename, 0, bytes.NewReader(contentsChunk1))
c.Assert(err, check.IsNil)
c.Assert(nn, check.Equals, int64(len(contentsChunk1)))
fi, err := suite.StorageDriver.Stat(filename)
c.Assert(err, check.IsNil)
c.Assert(fi, check.NotNil)
c.Assert(fi.Size(), check.Equals, int64(len(contentsChunk1)))
if fi.Size() > chunkSize {
c.Fatalf("Offset too large, %d > %d", fi.Size(), chunkSize)
}
nn, err = suite.StorageDriver.WriteStream(filename, fi.Size(), bytes.NewReader(contentsChunk2))
c.Assert(err, check.IsNil)
c.Assert(nn, check.Equals, int64(len(contentsChunk2)))
fi, err = suite.StorageDriver.Stat(filename)
c.Assert(err, check.IsNil)
c.Assert(fi, check.NotNil)
c.Assert(fi.Size(), check.Equals, 2*chunkSize)
if fi.Size() > 2*chunkSize {
c.Fatalf("Offset too large, %d > %d", fi.Size(), 2*chunkSize)
}
nn, err = suite.StorageDriver.WriteStream(filename, fi.Size(), bytes.NewReader(fullContents[fi.Size():]))
c.Assert(err, check.IsNil)
c.Assert(nn, check.Equals, int64(len(fullContents[fi.Size():])))
received, err := suite.StorageDriver.GetContent(filename)
c.Assert(err, check.IsNil)
c.Assert(received, check.DeepEquals, fullContents)
// Writing past size of file extends file (no offest error). We would like
// to write chunk 4 one chunk length past chunk 3. It should be successful
// and the resulting file will be 5 chunks long, with a chunk of all
// zeros.
fullContents = append(fullContents, zeroChunk...)
fullContents = append(fullContents, contentsChunk4...)
nn, err = suite.StorageDriver.WriteStream(filename, int64(len(fullContents))-chunkSize, bytes.NewReader(contentsChunk4))
c.Assert(err, check.IsNil)
c.Assert(nn, check.Equals, chunkSize)
fi, err = suite.StorageDriver.Stat(filename)
c.Assert(err, check.IsNil)
c.Assert(fi, check.NotNil)
c.Assert(fi.Size(), check.Equals, int64(len(fullContents)))
received, err = suite.StorageDriver.GetContent(filename)
c.Assert(err, check.IsNil)
c.Assert(len(received), check.Equals, len(fullContents))
c.Assert(received[chunkSize*3:chunkSize*4], check.DeepEquals, zeroChunk)
c.Assert(received[chunkSize*4:chunkSize*5], check.DeepEquals, contentsChunk4)
c.Assert(received, check.DeepEquals, fullContents)
// Ensure that negative offsets return correct error.
nn, err = suite.StorageDriver.WriteStream(filename, -1, bytes.NewReader(zeroChunk))
c.Assert(err, check.NotNil)
c.Assert(err, check.FitsTypeOf, storagedriver.InvalidOffsetError{})
c.Assert(err.(storagedriver.InvalidOffsetError).Path, check.Equals, filename)
c.Assert(err.(storagedriver.InvalidOffsetError).Offset, check.Equals, int64(-1))
}
// TestReadNonexistentStream tests that reading a stream for a nonexistent path
@ -260,13 +345,13 @@ func (suite *DriverSuite) TestReadNonexistentStream(c *check.C) {
// TestList checks the returned list of keys after populating a directory tree.
func (suite *DriverSuite) TestList(c *check.C) {
rootDirectory := "/" + randomString(uint64(8+rand.Intn(8)))
rootDirectory := "/" + randomString(int64(8+rand.Intn(8)))
defer suite.StorageDriver.Delete(rootDirectory)
parentDirectory := rootDirectory + "/" + randomString(uint64(8+rand.Intn(8)))
parentDirectory := rootDirectory + "/" + randomString(int64(8+rand.Intn(8)))
childFiles := make([]string, 50)
for i := 0; i < len(childFiles); i++ {
childFile := parentDirectory + "/" + randomString(uint64(8+rand.Intn(8)))
childFile := parentDirectory + "/" + randomString(int64(8+rand.Intn(8)))
childFiles[i] = childFile
err := suite.StorageDriver.PutContent(childFile, []byte(randomString(32)))
c.Assert(err, check.IsNil)
@ -286,6 +371,11 @@ func (suite *DriverSuite) TestList(c *check.C) {
sort.Strings(keys)
c.Assert(keys, check.DeepEquals, childFiles)
// A few checks to add here (check out #819 for more discussion on this):
// 1. Ensure that all paths are absolute.
// 2. Ensure that listings only include direct children.
// 3. Ensure that we only respond to directory listings that end with a slash (maybe?).
}
// TestMove checks that a moved object no longer exists at the source path and
@ -378,21 +468,75 @@ func (suite *DriverSuite) TestDeleteFolder(c *check.C) {
c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{})
}
// TestStatCall runs verifies the implementation of the storagedriver's Stat call.
func (suite *DriverSuite) TestStatCall(c *check.C) {
content := randomString(4096)
dirPath := randomString(32)
fileName := randomString(32)
filePath := path.Join(dirPath, fileName)
// Call on non-existent file/dir, check error.
fi, err := suite.StorageDriver.Stat(filePath)
c.Assert(err, check.NotNil)
c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{})
c.Assert(fi, check.IsNil)
err = suite.StorageDriver.PutContent(filePath, []byte(content))
c.Assert(err, check.IsNil)
// Call on regular file, check results
start := time.Now().Truncate(time.Second) // truncated for filesystem
fi, err = suite.StorageDriver.Stat(filePath)
c.Assert(err, check.IsNil)
expectedModTime := time.Now()
c.Assert(fi, check.NotNil)
c.Assert(fi.Path(), check.Equals, filePath)
c.Assert(fi.Size(), check.Equals, int64(len(content)))
c.Assert(fi.IsDir(), check.Equals, false)
if start.After(fi.ModTime()) {
c.Fatalf("modtime %s before file created (%v)", fi.ModTime(), start)
}
if fi.ModTime().After(expectedModTime) {
c.Fatalf("modtime %s after file created (%v)", fi.ModTime(), expectedModTime)
}
// Call on directory
start = time.Now().Truncate(time.Second)
fi, err = suite.StorageDriver.Stat(dirPath)
c.Assert(err, check.IsNil)
expectedModTime = time.Now()
c.Assert(fi, check.NotNil)
c.Assert(fi.Path(), check.Equals, dirPath)
c.Assert(fi.Size(), check.Equals, int64(0))
c.Assert(fi.IsDir(), check.Equals, true)
if start.After(fi.ModTime()) {
c.Fatalf("modtime %s before file created (%v)", fi.ModTime(), start)
}
if fi.ModTime().After(expectedModTime) {
c.Fatalf("modtime %s after file created (%v)", fi.ModTime(), expectedModTime)
}
}
// TestConcurrentFileStreams checks that multiple *os.File objects can be passed
// in to WriteStream concurrently without hanging.
// TODO(bbland): fix this test...
func (suite *DriverSuite) TestConcurrentFileStreams(c *check.C) {
if _, isIPC := suite.StorageDriver.(*ipc.StorageDriverClient); isIPC {
c.Skip("Need to fix out-of-process concurrency")
}
// if _, isIPC := suite.StorageDriver.(*ipc.StorageDriverClient); isIPC {
// c.Skip("Need to fix out-of-process concurrency")
// }
doneChan := make(chan struct{})
var wg sync.WaitGroup
testStream := func(size int) {
testStream := func(size int64) {
defer wg.Done()
suite.testFileStreams(c, size)
doneChan <- struct{}{}
}
wg.Add(6)
go testStream(8 * 1024 * 1024)
go testStream(4 * 1024 * 1024)
go testStream(2 * 1024 * 1024)
@ -400,13 +544,10 @@ func (suite *DriverSuite) TestConcurrentFileStreams(c *check.C) {
go testStream(1024)
go testStream(64)
for i := 0; i < 6; i++ {
<-doneChan
}
wg.Wait()
}
func (suite *DriverSuite) testFileStreams(c *check.C, size int) {
func (suite *DriverSuite) testFileStreams(c *check.C, size int64) {
tf, err := ioutil.TempFile("", "tf")
c.Assert(err, check.IsNil)
defer os.Remove(tf.Name())
@ -414,7 +555,7 @@ func (suite *DriverSuite) testFileStreams(c *check.C, size int) {
tfName := path.Base(tf.Name())
defer suite.StorageDriver.Delete(tfName)
contents := []byte(randomString(uint64(size)))
contents := []byte(randomString(size))
_, err = tf.Write(contents)
c.Assert(err, check.IsNil)
@ -422,8 +563,9 @@ func (suite *DriverSuite) testFileStreams(c *check.C, size int) {
tf.Sync()
tf.Seek(0, os.SEEK_SET)
err = suite.StorageDriver.WriteStream(tfName, 0, uint64(size), tf)
nn, err := suite.StorageDriver.WriteStream(tfName, 0, tf)
c.Assert(err, check.IsNil)
c.Assert(nn, check.Equals, size)
reader, err := suite.StorageDriver.ReadStream(tfName, 0)
c.Assert(err, check.IsNil)
@ -450,8 +592,9 @@ func (suite *DriverSuite) writeReadCompare(c *check.C, filename string, contents
func (suite *DriverSuite) writeReadCompareStreams(c *check.C, filename string, contents []byte) {
defer suite.StorageDriver.Delete(filename)
err := suite.StorageDriver.WriteStream(filename, 0, uint64(len(contents)), ioutil.NopCloser(bytes.NewReader(contents)))
nn, err := suite.StorageDriver.WriteStream(filename, 0, bytes.NewReader(contents))
c.Assert(err, check.IsNil)
c.Assert(nn, check.Equals, int64(len(contents)))
reader, err := suite.StorageDriver.ReadStream(filename, 0)
c.Assert(err, check.IsNil)
@ -465,7 +608,7 @@ func (suite *DriverSuite) writeReadCompareStreams(c *check.C, filename string, c
var pathChars = []byte("abcdefghijklmnopqrstuvwxyz")
func randomString(length uint64) string {
func randomString(length int64) string {
b := make([]byte, length)
for i := range b {
b[i] = pathChars[rand.Intn(len(pathChars))]