From eea043dc7bc8aa404e8821041b412468ae936620 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Thu, 10 Mar 2016 16:46:43 -0800 Subject: [PATCH] Removes ceph rados driver in favor of Swift API gateway support Signed-off-by: Brian Bland --- docs/storage/driver/rados/doc.go | 3 - docs/storage/driver/rados/rados.go | 632 ------------------------ docs/storage/driver/rados/rados_test.go | 40 -- 3 files changed, 675 deletions(-) delete mode 100644 docs/storage/driver/rados/doc.go delete mode 100644 docs/storage/driver/rados/rados.go delete mode 100644 docs/storage/driver/rados/rados_test.go diff --git a/docs/storage/driver/rados/doc.go b/docs/storage/driver/rados/doc.go deleted file mode 100644 index 655c68a3..00000000 --- a/docs/storage/driver/rados/doc.go +++ /dev/null @@ -1,3 +0,0 @@ -// Package rados implements the rados storage driver backend. Support can be -// enabled by including the "include_rados" build tag. -package rados diff --git a/docs/storage/driver/rados/rados.go b/docs/storage/driver/rados/rados.go deleted file mode 100644 index c2be528e..00000000 --- a/docs/storage/driver/rados/rados.go +++ /dev/null @@ -1,632 +0,0 @@ -// +build include_rados - -package rados - -import ( - "bytes" - "encoding/binary" - "fmt" - "io" - "io/ioutil" - "path" - "strconv" - - log "github.com/Sirupsen/logrus" - "github.com/docker/distribution/context" - storagedriver "github.com/docker/distribution/registry/storage/driver" - "github.com/docker/distribution/registry/storage/driver/base" - "github.com/docker/distribution/registry/storage/driver/factory" - "github.com/docker/distribution/uuid" - "github.com/noahdesu/go-ceph/rados" -) - -const driverName = "rados" - -// Prefix all the stored blob -const objectBlobPrefix = "blob:" - -// Stripes objects size to 4M -const defaultChunkSize = 4 << 20 -const defaultXattrTotalSizeName = "total-size" - -// Max number of keys fetched from omap at each read operation -const defaultKeysFetched = 1 - -//DriverParameters A struct that encapsulates all of the driver parameters after all values have been set -type DriverParameters struct { - poolname string - username string - chunksize uint64 -} - -func init() { - factory.Register(driverName, &radosDriverFactory{}) -} - -// radosDriverFactory implements the factory.StorageDriverFactory interface -type radosDriverFactory struct{} - -func (factory *radosDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) { - return FromParameters(parameters) -} - -type driver struct { - Conn *rados.Conn - Ioctx *rados.IOContext - chunksize uint64 -} - -type baseEmbed struct { - base.Base -} - -// Driver is a storagedriver.StorageDriver implementation backed by Ceph RADOS -// Objects are stored at absolute keys in the provided bucket. -type Driver struct { - baseEmbed -} - -// FromParameters constructs a new Driver with a given parameters map -// Required parameters: -// - poolname: the ceph pool name -func FromParameters(parameters map[string]interface{}) (*Driver, error) { - - pool, ok := parameters["poolname"] - if !ok { - return nil, fmt.Errorf("No poolname parameter provided") - } - - username, ok := parameters["username"] - if !ok { - username = "" - } - - chunksize := uint64(defaultChunkSize) - chunksizeParam, ok := parameters["chunksize"] - if ok { - chunksize, ok = chunksizeParam.(uint64) - if !ok { - return nil, fmt.Errorf("The chunksize parameter should be a number") - } - } - - params := DriverParameters{ - fmt.Sprint(pool), - fmt.Sprint(username), - chunksize, - } - - return New(params) -} - -// New constructs a new Driver -func New(params DriverParameters) (*Driver, error) { - var conn *rados.Conn - var err error - - if params.username != "" { - log.Infof("Opening connection to pool %s using user %s", params.poolname, params.username) - conn, err = rados.NewConnWithUser(params.username) - } else { - log.Infof("Opening connection to pool %s", params.poolname) - conn, err = rados.NewConn() - } - - if err != nil { - return nil, err - } - - err = conn.ReadDefaultConfigFile() - if err != nil { - return nil, err - } - - err = conn.Connect() - if err != nil { - return nil, err - } - - log.Infof("Connected") - - ioctx, err := conn.OpenIOContext(params.poolname) - - log.Infof("Connected to pool %s", params.poolname) - - if err != nil { - return nil, err - } - - d := &driver{ - Ioctx: ioctx, - Conn: conn, - chunksize: params.chunksize, - } - - return &Driver{ - baseEmbed: baseEmbed{ - Base: base.Base{ - StorageDriver: d, - }, - }, - }, nil -} - -// Implement the storagedriver.StorageDriver interface - -func (d *driver) Name() string { - return driverName -} - -// GetContent retrieves the content stored at "path" as a []byte. -func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) { - rc, err := d.ReadStream(ctx, path, 0) - if err != nil { - return nil, err - } - defer rc.Close() - - p, err := ioutil.ReadAll(rc) - if err != nil { - return nil, err - } - - return p, nil -} - -// PutContent stores the []byte content at a location designated by "path". -func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error { - if _, err := d.WriteStream(ctx, path, 0, bytes.NewReader(contents)); err != nil { - return err - } - - return nil -} - -// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a -// given byte offset. -type readStreamReader struct { - driver *driver - oid string - size uint64 - offset uint64 -} - -func (r *readStreamReader) Read(b []byte) (n int, err error) { - // Determine the part available to read - bufferOffset := uint64(0) - bufferSize := uint64(len(b)) - - // End of the object, read less than the buffer size - if bufferSize > r.size-r.offset { - bufferSize = r.size - r.offset - } - - // Fill `b` - for bufferOffset < bufferSize { - // Get the offset in the object chunk - chunkedOid, chunkedOffset := r.driver.getChunkNameFromOffset(r.oid, r.offset) - - // Determine the best size to read - bufferEndOffset := bufferSize - if bufferEndOffset-bufferOffset > r.driver.chunksize-chunkedOffset { - bufferEndOffset = bufferOffset + (r.driver.chunksize - chunkedOffset) - } - - // Read the chunk - n, err = r.driver.Ioctx.Read(chunkedOid, b[bufferOffset:bufferEndOffset], chunkedOffset) - - if err != nil { - return int(bufferOffset), err - } - - bufferOffset += uint64(n) - r.offset += uint64(n) - } - - // EOF if the offset is at the end of the object - if r.offset == r.size { - return int(bufferOffset), io.EOF - } - - return int(bufferOffset), nil -} - -func (r *readStreamReader) Close() error { - return nil -} - -func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { - // get oid from filename - oid, err := d.getOid(path) - - if err != nil { - return nil, err - } - - // get object stat - stat, err := d.Stat(ctx, path) - - if err != nil { - return nil, err - } - - if offset > stat.Size() { - return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset} - } - - return &readStreamReader{ - driver: d, - oid: oid, - size: uint64(stat.Size()), - offset: uint64(offset), - }, nil -} - -func (d *driver) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (totalRead int64, err error) { - buf := make([]byte, d.chunksize) - totalRead = 0 - - oid, err := d.getOid(path) - if err != nil { - switch err.(type) { - // Trying to write new object, generate new blob identifier for it - case storagedriver.PathNotFoundError: - oid = d.generateOid() - err = d.putOid(path, oid) - if err != nil { - return 0, err - } - default: - return 0, err - } - } else { - // Check total object size only for existing ones - totalSize, err := d.getXattrTotalSize(ctx, oid) - if err != nil { - return 0, err - } - - // If offset if after the current object size, fill the gap with zeros - for totalSize < uint64(offset) { - sizeToWrite := d.chunksize - if totalSize-uint64(offset) < sizeToWrite { - sizeToWrite = totalSize - uint64(offset) - } - - chunkName, chunkOffset := d.getChunkNameFromOffset(oid, uint64(totalSize)) - err = d.Ioctx.Write(chunkName, buf[:sizeToWrite], uint64(chunkOffset)) - if err != nil { - return totalRead, err - } - - totalSize += sizeToWrite - } - } - - // Writer - for { - // Align to chunk size - sizeRead := uint64(0) - sizeToRead := uint64(offset+totalRead) % d.chunksize - if sizeToRead == 0 { - sizeToRead = d.chunksize - } - - // Read from `reader` - for sizeRead < sizeToRead { - nn, err := reader.Read(buf[sizeRead:sizeToRead]) - sizeRead += uint64(nn) - - if err != nil { - if err != io.EOF { - return totalRead, err - } - - break - } - } - - // End of file and nothing was read - if sizeRead == 0 { - break - } - - // Write chunk object - chunkName, chunkOffset := d.getChunkNameFromOffset(oid, uint64(offset+totalRead)) - err = d.Ioctx.Write(chunkName, buf[:sizeRead], uint64(chunkOffset)) - - if err != nil { - return totalRead, err - } - - // Update total object size as xattr in the first chunk of the object - err = d.setXattrTotalSize(oid, uint64(offset+totalRead)+sizeRead) - if err != nil { - return totalRead, err - } - - totalRead += int64(sizeRead) - - // End of file - if sizeRead < sizeToRead { - break - } - } - - return totalRead, nil -} - -// Stat retrieves the FileInfo for the given path, including the current size -func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) { - // get oid from filename - oid, err := d.getOid(path) - - if err != nil { - return nil, err - } - - // the path is a virtual directory? - if oid == "" { - return storagedriver.FileInfoInternal{ - FileInfoFields: storagedriver.FileInfoFields{ - Path: path, - Size: 0, - IsDir: true, - }, - }, nil - } - - // stat first chunk - stat, err := d.Ioctx.Stat(oid + "-0") - - if err != nil { - return nil, err - } - - // get total size of chunked object - totalSize, err := d.getXattrTotalSize(ctx, oid) - - if err != nil { - return nil, err - } - - return storagedriver.FileInfoInternal{ - FileInfoFields: storagedriver.FileInfoFields{ - Path: path, - Size: int64(totalSize), - ModTime: stat.ModTime, - }, - }, nil -} - -// List returns a list of the objects that are direct descendants of the given path. -func (d *driver) List(ctx context.Context, dirPath string) ([]string, error) { - files, err := d.listDirectoryOid(dirPath) - - if err != nil { - return nil, storagedriver.PathNotFoundError{Path: dirPath} - } - - keys := make([]string, 0, len(files)) - for k := range files { - if k != dirPath { - keys = append(keys, path.Join(dirPath, k)) - } - } - - return keys, nil -} - -// Move moves an object stored at sourcePath to destPath, removing the original -// object. -func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error { - // Get oid - oid, err := d.getOid(sourcePath) - - if err != nil { - return err - } - - // Move reference - err = d.putOid(destPath, oid) - - if err != nil { - return err - } - - // Delete old reference - err = d.deleteOid(sourcePath) - - if err != nil { - return err - } - - return nil -} - -// Delete recursively deletes all objects stored at "path" and its subpaths. -func (d *driver) Delete(ctx context.Context, objectPath string) error { - // Get oid - oid, err := d.getOid(objectPath) - - if err != nil { - return err - } - - // Deleting virtual directory - if oid == "" { - objects, err := d.listDirectoryOid(objectPath) - if err != nil { - return err - } - - for object := range objects { - err = d.Delete(ctx, path.Join(objectPath, object)) - if err != nil { - return err - } - } - } else { - // Delete object chunks - totalSize, err := d.getXattrTotalSize(ctx, oid) - - if err != nil { - return err - } - - for offset := uint64(0); offset < totalSize; offset += d.chunksize { - chunkName, _ := d.getChunkNameFromOffset(oid, offset) - - err = d.Ioctx.Delete(chunkName) - if err != nil { - return err - } - } - - // Delete reference - err = d.deleteOid(objectPath) - if err != nil { - return err - } - } - - return nil -} - -// URLFor returns a URL which may be used to retrieve the content stored at the given path. -// May return an UnsupportedMethodErr in certain StorageDriver implementations. -func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) { - return "", storagedriver.ErrUnsupportedMethod{} -} - -// Generate a blob identifier -func (d *driver) generateOid() string { - return objectBlobPrefix + uuid.Generate().String() -} - -// Reference a object and its hierarchy -func (d *driver) putOid(objectPath string, oid string) error { - directory := path.Dir(objectPath) - base := path.Base(objectPath) - createParentReference := true - - // After creating this reference, skip the parents referencing since the - // hierarchy already exists - if oid == "" { - firstReference, err := d.Ioctx.GetOmapValues(directory, "", "", 1) - if (err == nil) && (len(firstReference) > 0) { - createParentReference = false - } - } - - oids := map[string][]byte{ - base: []byte(oid), - } - - // Reference object - err := d.Ioctx.SetOmap(directory, oids) - if err != nil { - return err - } - - // Esure parent virtual directories - if createParentReference { - return d.putOid(directory, "") - } - - return nil -} - -// Get the object identifier from an object name -func (d *driver) getOid(objectPath string) (string, error) { - directory := path.Dir(objectPath) - base := path.Base(objectPath) - - files, err := d.Ioctx.GetOmapValues(directory, "", base, 1) - - if (err != nil) || (files[base] == nil) { - return "", storagedriver.PathNotFoundError{Path: objectPath} - } - - return string(files[base]), nil -} - -// List the objects of a virtual directory -func (d *driver) listDirectoryOid(path string) (list map[string][]byte, err error) { - return d.Ioctx.GetAllOmapValues(path, "", "", defaultKeysFetched) -} - -// Remove a file from the files hierarchy -func (d *driver) deleteOid(objectPath string) error { - // Remove object reference - directory := path.Dir(objectPath) - base := path.Base(objectPath) - err := d.Ioctx.RmOmapKeys(directory, []string{base}) - - if err != nil { - return err - } - - // Remove virtual directory if empty (no more references) - firstReference, err := d.Ioctx.GetOmapValues(directory, "", "", 1) - - if err != nil { - return err - } - - if len(firstReference) == 0 { - // Delete omap - err := d.Ioctx.Delete(directory) - - if err != nil { - return err - } - - // Remove reference on parent omaps - if directory != "" { - return d.deleteOid(directory) - } - } - - return nil -} - -// Takes an offset in an chunked object and return the chunk name and a new -// offset in this chunk object -func (d *driver) getChunkNameFromOffset(oid string, offset uint64) (string, uint64) { - chunkID := offset / d.chunksize - chunkedOid := oid + "-" + strconv.FormatInt(int64(chunkID), 10) - chunkedOffset := offset % d.chunksize - return chunkedOid, chunkedOffset -} - -// Set the total size of a chunked object `oid` -func (d *driver) setXattrTotalSize(oid string, size uint64) error { - // Convert uint64 `size` to []byte - xattr := make([]byte, binary.MaxVarintLen64) - binary.LittleEndian.PutUint64(xattr, size) - - // Save the total size as a xattr in the first chunk - return d.Ioctx.SetXattr(oid+"-0", defaultXattrTotalSizeName, xattr) -} - -// Get the total size of the chunked object `oid` stored as xattr -func (d *driver) getXattrTotalSize(ctx context.Context, oid string) (uint64, error) { - // Fetch xattr as []byte - xattr := make([]byte, binary.MaxVarintLen64) - xattrLength, err := d.Ioctx.GetXattr(oid+"-0", defaultXattrTotalSizeName, xattr) - - if err != nil { - return 0, err - } - - if xattrLength != len(xattr) { - context.GetLogger(ctx).Errorf("object %s xattr length mismatch: %d != %d", oid, xattrLength, len(xattr)) - return 0, storagedriver.PathNotFoundError{Path: oid} - } - - // Convert []byte as uint64 - totalSize := binary.LittleEndian.Uint64(xattr) - - return totalSize, nil -} diff --git a/docs/storage/driver/rados/rados_test.go b/docs/storage/driver/rados/rados_test.go deleted file mode 100644 index ce367fb5..00000000 --- a/docs/storage/driver/rados/rados_test.go +++ /dev/null @@ -1,40 +0,0 @@ -// +build include_rados - -package rados - -import ( - "os" - "testing" - - storagedriver "github.com/docker/distribution/registry/storage/driver" - "github.com/docker/distribution/registry/storage/driver/testsuites" - - "gopkg.in/check.v1" -) - -// Hook up gocheck into the "go test" runner. -func Test(t *testing.T) { check.TestingT(t) } - -func init() { - poolname := os.Getenv("RADOS_POOL") - username := os.Getenv("RADOS_USER") - - driverConstructor := func() (storagedriver.StorageDriver, error) { - parameters := DriverParameters{ - poolname, - username, - defaultChunkSize, - } - - return New(parameters) - } - - skipCheck := func() string { - if poolname == "" { - return "RADOS_POOL must be set to run Rado tests" - } - return "" - } - - testsuites.RegisterSuite(driverConstructor, skipCheck) -}