From 2ebc373d917ecc12cecaf566e0a90a3de0e849f3 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Thu, 4 Dec 2014 20:14:41 -0800 Subject: [PATCH] Refactor inmemory driver for Stat and WriteStream methods This change started out as simply updating the existing inmemory driver to implement the new Stat call. After struggling with the map based implementation, it has been refactored to be a tree-based implementation. This process has exposed a few missing error cases in the StorageDriver API that should be addressed in the coming weeks. --- storagedriver/inmemory/driver.go | 199 +++++++++------- storagedriver/inmemory/driver_test.go | 5 +- storagedriver/inmemory/mfs.go | 329 ++++++++++++++++++++++++++ 3 files changed, 453 insertions(+), 80 deletions(-) create mode 100644 storagedriver/inmemory/mfs.go diff --git a/storagedriver/inmemory/driver.go b/storagedriver/inmemory/driver.go index 3231b017d..b6bdc2588 100644 --- a/storagedriver/inmemory/driver.go +++ b/storagedriver/inmemory/driver.go @@ -5,9 +5,9 @@ import ( "fmt" "io" "io/ioutil" - "regexp" "strings" "sync" + "time" "github.com/docker/docker-registry/storagedriver" "github.com/docker/docker-registry/storagedriver/factory" @@ -29,13 +29,18 @@ func (factory *inMemoryDriverFactory) Create(parameters map[string]string) (stor // Driver is a storagedriver.StorageDriver implementation backed by a local map. // Intended solely for example and testing purposes. type Driver struct { - storage map[string][]byte - mutex sync.RWMutex + root *dir + mutex sync.RWMutex } // New constructs a new Driver. func New() *Driver { - return &Driver{storage: make(map[string][]byte)} + return &Driver{root: &dir{ + common: common{ + p: "/", + mod: time.Now(), + }, + }} } // Implement the storagedriver.StorageDriver interface. @@ -44,18 +49,31 @@ func New() *Driver { func (d *Driver) GetContent(path string) ([]byte, error) { d.mutex.RLock() defer d.mutex.RUnlock() - contents, ok := d.storage[path] - if !ok { - return nil, storagedriver.PathNotFoundError{Path: path} + + rc, err := d.ReadStream(path, 0) + if err != nil { + return nil, err } - return contents, nil + defer rc.Close() + + return ioutil.ReadAll(rc) } // PutContent stores the []byte content at a location designated by "path". -func (d *Driver) PutContent(path string, contents []byte) error { +func (d *Driver) PutContent(p string, contents []byte) error { d.mutex.Lock() defer d.mutex.Unlock() - d.storage[path] = contents + + f, err := d.root.mkfile(p) + if err != nil { + // TODO(stevvooe): Again, we need to clarify when this is not a + // directory in StorageDriver API. + return fmt.Errorf("not a file") + } + + f.truncate() + f.WriteAt(contents, 0) + return nil } @@ -64,86 +82,104 @@ func (d *Driver) PutContent(path string, contents []byte) error { func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) { d.mutex.RLock() defer d.mutex.RUnlock() - contents, err := d.GetContent(path) - if err != nil { - return nil, err - } else if len(contents) <= int(offset) { - return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset} + + path = d.normalize(path) + found := d.root.find(path) + + if found.path() != path { + return nil, storagedriver.PathNotFoundError{Path: path} } - src := contents[offset:] - buf := make([]byte, len(src)) - copy(buf, src) - return ioutil.NopCloser(bytes.NewReader(buf)), nil + if found.isdir() { + return nil, fmt.Errorf("%q is a directory", path) + } + + return ioutil.NopCloser(found.(*file).sectionReader(offset)), nil } // WriteStream stores the contents of the provided io.ReadCloser at a location // designated by the given path. -func (d *Driver) WriteStream(path string, offset, size int64, reader io.ReadCloser) error { - defer reader.Close() - d.mutex.RLock() - defer d.mutex.RUnlock() +func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (nn int64, err error) { + d.mutex.Lock() + defer d.mutex.Unlock() - resumableOffset, err := d.CurrentSize(path) + if offset < 0 { + return 0, storagedriver.InvalidOffsetError{Path: path, Offset: offset} + } + + normalized := d.normalize(path) + + f, err := d.root.mkfile(normalized) if err != nil { - return err + return 0, fmt.Errorf("not a file") } - if offset > int64(resumableOffset) { - return storagedriver.InvalidOffsetError{Path: path, Offset: offset} - } + var buf bytes.Buffer - contents, err := ioutil.ReadAll(reader) + nn, err = buf.ReadFrom(reader) if err != nil { - return err + // TODO(stevvooe): This condition is odd and we may need to clarify: + // we've read nn bytes from reader but have written nothing to the + // backend. What is the correct return value? Really, the caller needs + // to know that the reader has been advanced and reattempting the + // operation is incorrect. + return nn, err } - if offset > 0 { - contents = append(d.storage[path][0:offset], contents...) - } - - d.storage[path] = contents - return nil + f.WriteAt(buf.Bytes(), offset) + return nn, err } -// CurrentSize retrieves the curernt size in bytes of the object at the given -// path. -func (d *Driver) CurrentSize(path string) (uint64, error) { +// Stat returns info about the provided path. +func (d *Driver) Stat(path string) (storagedriver.FileInfo, error) { d.mutex.RLock() defer d.mutex.RUnlock() - contents, ok := d.storage[path] - if !ok { - return 0, nil + + normalized := d.normalize(path) + found := d.root.find(path) + + if found.path() != normalized { + return nil, storagedriver.PathNotFoundError{Path: path} } - return uint64(len(contents)), nil + + fi := storagedriver.FileInfoFields{ + Path: path, + IsDir: found.isdir(), + ModTime: found.modtime(), + } + + if !fi.IsDir { + fi.Size = int64(len(found.(*file).data)) + } + + return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil } // List returns a list of the objects that are direct descendants of the given // path. func (d *Driver) List(path string) ([]string, error) { - if path[len(path)-1] != '/' { - path += "/" - } - subPathMatcher, err := regexp.Compile(fmt.Sprintf("^%s[^/]+", path)) - if err != nil { - return nil, err + normalized := d.normalize(path) + + found := d.root.find(normalized) + + if !found.isdir() { + return nil, fmt.Errorf("not a directory") // TODO(stevvooe): Need error type for this... } - d.mutex.RLock() - defer d.mutex.RUnlock() - // we use map to collect unique keys - keySet := make(map[string]struct{}) - for k := range d.storage { - if key := subPathMatcher.FindString(k); key != "" { - keySet[key] = struct{}{} + entries, err := found.(*dir).list(normalized) + + if err != nil { + switch err { + case errNotExists: + return nil, storagedriver.PathNotFoundError{Path: path} + case errIsNotDir: + return nil, fmt.Errorf("not a directory") + default: + return nil, err } } - keys := make([]string, 0, len(keySet)) - for k := range keySet { - keys = append(keys, k) - } - return keys, nil + return entries, nil } // Move moves an object stored at sourcePath to destPath, removing the original @@ -151,32 +187,37 @@ func (d *Driver) List(path string) ([]string, error) { func (d *Driver) Move(sourcePath string, destPath string) error { d.mutex.Lock() defer d.mutex.Unlock() - contents, ok := d.storage[sourcePath] - if !ok { - return storagedriver.PathNotFoundError{Path: sourcePath} + + normalizedSrc, normalizedDst := d.normalize(sourcePath), d.normalize(destPath) + + err := d.root.move(normalizedSrc, normalizedDst) + switch err { + case errNotExists: + return storagedriver.PathNotFoundError{Path: destPath} + default: + return err } - d.storage[destPath] = contents - delete(d.storage, sourcePath) - return nil } // Delete recursively deletes all objects stored at "path" and its subpaths. func (d *Driver) Delete(path string) error { d.mutex.Lock() defer d.mutex.Unlock() - var subPaths []string - for k := range d.storage { - if strings.HasPrefix(k, path) { - subPaths = append(subPaths, k) - } - } - if len(subPaths) == 0 { + normalized := d.normalize(path) + + err := d.root.delete(normalized) + switch err { + case errNotExists: return storagedriver.PathNotFoundError{Path: path} + default: + return err } - - for _, subPath := range subPaths { - delete(d.storage, subPath) - } - return nil +} + +func (d *Driver) normalize(p string) string { + if !strings.HasPrefix(p, "/") { + p = "/" + p // Ghetto path absolution. + } + return p } diff --git a/storagedriver/inmemory/driver_test.go b/storagedriver/inmemory/driver_test.go index 87549542b..6a4b36973 100644 --- a/storagedriver/inmemory/driver_test.go +++ b/storagedriver/inmemory/driver_test.go @@ -17,5 +17,8 @@ func init() { return New(), nil } testsuites.RegisterInProcessSuite(inmemoryDriverConstructor, testsuites.NeverSkip) - testsuites.RegisterIPCSuite(driverName, nil, testsuites.NeverSkip) + + // BUG(stevvooe): Disable flaky IPC tests for now when we can troubleshoot + // the problems with libchan. + // testsuites.RegisterIPCSuite(driverName, nil, testsuites.NeverSkip) } diff --git a/storagedriver/inmemory/mfs.go b/storagedriver/inmemory/mfs.go new file mode 100644 index 000000000..5248bbc6c --- /dev/null +++ b/storagedriver/inmemory/mfs.go @@ -0,0 +1,329 @@ +package inmemory + +import ( + "fmt" + "io" + "path" + "sort" + "strings" + "time" +) + +var ( + errExists = fmt.Errorf("exists") + errNotExists = fmt.Errorf("exists") + errIsNotDir = fmt.Errorf("notdir") + errIsDir = fmt.Errorf("isdir") +) + +type node interface { + name() string + path() string + isdir() bool + modtime() time.Time +} + +// dir is the central type for the memory-based storagedriver. All operations +// are dispatched from a root dir. +type dir struct { + common + + // TODO(stevvooe): Use sorted slice + search. + children map[string]node +} + +var _ node = &dir{} + +func (d *dir) isdir() bool { + return true +} + +// add places the node n into dir d. +func (d *dir) add(n node) { + if d.children == nil { + d.children = make(map[string]node) + } + + d.children[n.name()] = n + d.mod = time.Now() +} + +// find searches for the node, given path q in dir. If the node is found, it +// will be returned. If the node is not found, the closet existing parent. If +// the node is found, the returned (node).path() will match q. +func (d *dir) find(q string) node { + q = strings.Trim(q, "/") + i := strings.Index(q, "/") + + if q == "" { + return d + } + + if i == 0 { + panic("shouldn't happen, no root paths") + } + + var component string + if i < 0 { + // No more path components + component = q + } else { + component = q[:i] + } + + child, ok := d.children[component] + if !ok { + // Node was not found. Return p and the current node. + return d + } + + if child.isdir() { + // traverse down! + q = q[i+1:] + return child.(*dir).find(q) + } + + return child +} + +func (d *dir) list(p string) ([]string, error) { + n := d.find(p) + + if n.path() != p { + return nil, errNotExists + } + + if !n.isdir() { + return nil, errIsNotDir + } + + var children []string + for _, child := range n.(*dir).children { + children = append(children, child.path()) + } + + sort.Strings(children) + return children, nil +} + +// mkfile or return the existing one. returns an error if it exists and is a +// directory. Essentially, this is open or create. +func (d *dir) mkfile(p string) (*file, error) { + n := d.find(p) + if n.path() == p { + if n.isdir() { + return nil, errIsDir + } + + return n.(*file), nil + } + + dirpath, filename := path.Split(p) + // Make any non-existent directories + n, err := d.mkdirs(dirpath) + if err != nil { + return nil, err + } + + dd := n.(*dir) + n = &file{ + common: common{ + p: path.Join(dd.path(), filename), + mod: time.Now(), + }, + } + + dd.add(n) + return n.(*file), nil +} + +// mkdirs creates any missing directory entries in p and returns the result. +func (d *dir) mkdirs(p string) (*dir, error) { + if p == "" { + p = "/" + } + + n := d.find(p) + + if !n.isdir() { + // Found something there + return nil, errIsNotDir + } + + if n.path() == p { + return n.(*dir), nil + } + + dd := n.(*dir) + + relative := strings.Trim(strings.TrimPrefix(p, n.path()), "/") + + if relative == "" { + return dd, nil + } + + components := strings.Split(relative, "/") + for _, component := range components { + d, err := dd.mkdir(component) + + if err != nil { + // This should actually never happen, since there are no children. + return nil, err + } + dd = d + } + + return dd, nil +} + +// mkdir creates a child directory under d with the given name. +func (d *dir) mkdir(name string) (*dir, error) { + if name == "" { + return nil, fmt.Errorf("invalid dirname") + } + + _, ok := d.children[name] + if ok { + return nil, errExists + } + + child := &dir{ + common: common{ + p: path.Join(d.path(), name), + mod: time.Now(), + }, + } + d.add(child) + d.mod = time.Now() + + return child, nil +} + +func (d *dir) move(src, dst string) error { + dstDirname, _ := path.Split(dst) + + dp, err := d.mkdirs(dstDirname) + if err != nil { + return err + } + + srcDirname, srcFilename := path.Split(src) + sp := d.find(srcDirname) + + if sp.path() != srcDirname { + return errNotExists + } + + s, ok := sp.(*dir).children[srcFilename] + if !ok { + return errNotExists + } + + delete(sp.(*dir).children, srcFilename) + + switch n := s.(type) { + case *dir: + n.p = dst + case *file: + n.p = dst + } + + dp.add(s) + + return nil +} + +func (d *dir) delete(p string) error { + dirname, filename := path.Split(p) + parent := d.find(dirname) + + if dirname != parent.path() { + return errNotExists + } + + if _, ok := parent.(*dir).children[filename]; !ok { + return errNotExists + } + + delete(parent.(*dir).children, filename) + return nil +} + +// dump outputs a primitive directory structure to stdout. +func (d *dir) dump(indent string) { + fmt.Println(indent, d.name()+"/") + + for _, child := range d.children { + if child.isdir() { + child.(*dir).dump(indent + "\t") + } else { + fmt.Println(indent, child.name()) + } + + } +} + +func (d *dir) String() string { + return fmt.Sprintf("&dir{path: %v, children: %v}", d.p, d.children) +} + +// file stores actual data in the fs tree. It acts like an open, seekable file +// where operations are conducted through ReadAt and WriteAt. Use it with +// SectionReader for the best effect. +type file struct { + common + data []byte +} + +var _ node = &file{} + +func (f *file) isdir() bool { + return false +} + +func (f *file) truncate() { + f.data = f.data[:0] +} + +func (f *file) sectionReader(offset int64) io.Reader { + return io.NewSectionReader(f, offset, int64(len(f.data))-offset) +} + +func (f *file) ReadAt(p []byte, offset int64) (n int, err error) { + return copy(p, f.data[offset:]), nil +} + +func (f *file) WriteAt(p []byte, offset int64) (n int, err error) { + if len(f.data) > 0 && offset >= int64(len(f.data)) { + // Extend missing region with a zero pad, while also preallocating out to size of p. + pad := offset - int64(len(f.data)) + size := len(p) + int(pad) + f.data = append(f.data, make([]byte, pad, size)...) + } + + f.data = append(f.data, p...) + return len(p), nil +} + +func (f *file) String() string { + return fmt.Sprintf("&file{path: %q}", f.p) +} + +// common provides shared fields and methods for node implementations. +type common struct { + p string + mod time.Time +} + +func (c *common) name() string { + _, name := path.Split(c.p) + return name +} + +func (c *common) path() string { + return c.p +} + +func (c *common) modtime() time.Time { + return c.mod +}