From cce4956131f8083ffaa5f032fd8ea8747a269117 Mon Sep 17 00:00:00 2001 From: Sylvain Baubeau Date: Mon, 11 May 2015 18:11:47 +0200 Subject: [PATCH] Add Openstack Swift storage driver Signed-off-by: Sylvain Baubeau --- docs/storage/driver/swift/swift.go | 519 ++++++++++++++++++++++++ docs/storage/driver/swift/swift_test.go | 141 +++++++ 2 files changed, 660 insertions(+) create mode 100644 docs/storage/driver/swift/swift.go create mode 100644 docs/storage/driver/swift/swift_test.go diff --git a/docs/storage/driver/swift/swift.go b/docs/storage/driver/swift/swift.go new file mode 100644 index 00000000..4a812e9e --- /dev/null +++ b/docs/storage/driver/swift/swift.go @@ -0,0 +1,519 @@ +// Package swift provides a storagedriver.StorageDriver implementation to +// store blobs in Openstack Swift object storage. +// +// This package leverages the ncw/swift client library for interfacing with +// Swift. +// +// Because Swift is a key, value store the Stat call does not support last modification +// time for directories (directories are an abstraction for key, value stores) +package swift + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + gopath "path" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/lebauce/swift" + + "github.com/docker/distribution/context" + storagedriver "github.com/docker/distribution/registry/storage/driver" + "github.com/docker/distribution/registry/storage/driver/base" + "github.com/docker/distribution/registry/storage/driver/factory" +) + +const driverName = "swift" + +const defaultChunkSize = 5 * 1024 * 1024 + +//DriverParameters A struct that encapsulates all of the driver parameters after all values have been set +type DriverParameters struct { + Username string + Password string + AuthURL string + Tenant string + Region string + Container string + Prefix string + ChunkSize int64 +} + +type swiftInfo map[string]interface{} + +func init() { + factory.Register(driverName, &swiftDriverFactory{}) +} + +// swiftDriverFactory implements the factory.StorageDriverFactory interface +type swiftDriverFactory struct{} + +func (factory *swiftDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) { + return FromParameters(parameters) +} + +type driver struct { + Conn swift.Connection + Container string + Prefix string + BulkDeleteSupport bool + ChunkSize int64 +} + +type baseEmbed struct { + base.Base +} + +// Driver is a storagedriver.StorageDriver implementation backed by Amazon Swift +// Objects are stored at absolute keys in the provided bucket. +type Driver struct { + baseEmbed +} + +// FromParameters constructs a new Driver with a given parameters map +// Required parameters: +// - username +// - password +// - authurl +// - container +func FromParameters(parameters map[string]interface{}) (*Driver, error) { + username, ok := parameters["username"] + if !ok || fmt.Sprint(username) == "" { + return nil, fmt.Errorf("No username parameter provided") + } + password, ok := parameters["password"] + if !ok || fmt.Sprint(password) == "" { + return nil, fmt.Errorf("No password parameter provided") + } + authURL, ok := parameters["authurl"] + if !ok || fmt.Sprint(authURL) == "" { + return nil, fmt.Errorf("No container parameter provided") + } + container, ok := parameters["container"] + if !ok || fmt.Sprint(container) == "" { + return nil, fmt.Errorf("No container parameter provided") + } + tenant, ok := parameters["tenant"] + if !ok { + tenant = "" + } + region, ok := parameters["region"] + if !ok { + region = "" + } + rootDirectory, ok := parameters["rootdirectory"] + if !ok { + rootDirectory = "" + } + chunkSize := int64(defaultChunkSize) + chunkSizeParam, ok := parameters["chunksize"] + if ok { + chunkSize, ok = chunkSizeParam.(int64) + if !ok { + return nil, fmt.Errorf("The chunksize parameter should be a number") + } + } + + params := DriverParameters{ + fmt.Sprint(username), + fmt.Sprint(password), + fmt.Sprint(authURL), + fmt.Sprint(tenant), + fmt.Sprint(region), + fmt.Sprint(container), + fmt.Sprint(rootDirectory), + chunkSize, + } + + return New(params) +} + +// New constructs a new Driver with the given Openstack Swift credentials and container name +func New(params DriverParameters) (*Driver, error) { + ct := swift.Connection{ + UserName: params.Username, + ApiKey: params.Password, + AuthUrl: params.AuthURL, + Region: params.Region, + UserAgent: "distribution", + Tenant: params.Tenant, + ConnectTimeout: 60 * time.Second, + Timeout: 15 * 60 * time.Second, + } + err := ct.Authenticate() + if err != nil { + return nil, fmt.Errorf("Swift authentication failed: %s", err) + } + + if err := ct.ContainerCreate(params.Container, nil); err != nil { + return nil, fmt.Errorf("Failed to create container %s (%s)", params.Container, err) + } + + if err := ct.ContainerCreate(params.Container + "_segments", nil); err != nil { + return nil, fmt.Errorf("Failed to create container %s (%s)", params.Container + "_segments", err) + } + + d := &driver{ + Conn: ct, + Container: params.Container, + Prefix: params.Prefix, + BulkDeleteSupport: detectBulkDelete(params.AuthURL), + ChunkSize: params.ChunkSize, + } + + return &Driver{ + baseEmbed: baseEmbed{ + Base: base.Base{ + StorageDriver: d, + }, + }, + }, nil +} + +// Implement the storagedriver.StorageDriver interface + +func (d *driver) Name() string { + return driverName +} + +// GetContent retrieves the content stored at "path" as a []byte. +func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) { + content, err := d.Conn.ObjectGetBytes(d.Container, d.swiftPath(path)) + if err != nil { + return nil, parseError(path, err) + } + return content, nil +} + +// PutContent stores the []byte content at a location designated by "path". +func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error { + if dir, err := d.createParentFolder(path); err != nil { + return parseError(dir, err) + } + err := d.Conn.ObjectPutBytes(d.Container, d.swiftPath(path), + contents, d.getContentType()) + return parseError(path, err) +} + +// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a +// given byte offset. +func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { + headers := make(swift.Headers) + headers["Range"] = "bytes=" + strconv.FormatInt(offset, 10) + "-" + + file, _, err := d.Conn.ObjectOpen(d.Container, d.swiftPath(path), false, headers) + + if err != nil { + if swiftErr, ok := err.(*swift.Error); ok && swiftErr.StatusCode == 416 { + return ioutil.NopCloser(bytes.NewReader(nil)), nil + } + + return nil, parseError(path, err) + } + + return file, nil +} + +// WriteStream stores the contents of the provided io.Reader at a +// location designated by the given path. The driver will know it has +// received the full contents when the reader returns io.EOF. The number +// of successfully READ bytes will be returned, even if an error is +// returned. May be used to resume writing a stream by providing a nonzero +// offset. Offsets past the current size will write from the position +// beyond the end of the file. +func (d *driver) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (int64, error) { + var ( + segments []swift.Object + paddingReader io.Reader + ) + + partNumber := int64(1) + bytesRead := int64(0) + currentLength := int64(0) + zeroBuf := make([]byte, d.ChunkSize) + segmentsContainer := d.Container + "_segments" + cursor := int64(0) + + getSegment := func() string { + return d.swiftPath(path) + "/" + fmt.Sprintf("%016d", partNumber) + } + + max := func(a int64, b int64) int64 { + if a > b { + return a + } + return b + } + + info, _, err := d.Conn.Object(d.Container, d.swiftPath(path)) + if err != nil { + if swiftErr, ok := err.(*swift.Error); ok { + if swiftErr.StatusCode == 404 { + // Create a object manifest + if dir, err := d.createParentFolder(path); err != nil { + return bytesRead, parseError(dir, err) + } + headers := make(swift.Headers) + headers["X-Object-Manifest"] = segmentsContainer + "/" + d.swiftPath(path) + manifest, err := d.Conn.ObjectCreate(d.Container, d.swiftPath(path), false, "", + d.getContentType(), headers) + manifest.Close() + if err != nil { + return bytesRead, parseError(path, err) + } + } else { + return bytesRead, parseError(path, err) + } + } else { + return bytesRead, parseError(path, err) + } + } else { + // The manifest already exists. Get all the segments + currentLength = info.Bytes + headers := make(swift.Headers) + headers["Content-Type"] = "application/json" + opts := &swift.ObjectsOpts{Prefix: d.swiftPath(path), Headers: headers} + segments, err = d.Conn.Objects(d.Container + "_segments", opts) + if err != nil { + return bytesRead, parseError(path, err) + } + } + + // First, we skip the existing segments that are not modified by this call + for i := range segments { + if offset < cursor + segments[i].Bytes { + break + } + cursor += segments[i].Bytes + partNumber++ + } + + // We reached the end of the file but we haven't reached 'offset' yet + // Therefore we add blocks of zeros + if offset >= currentLength { + for offset - currentLength >= d.ChunkSize { + // Insert a block a zero + d.Conn.ObjectPut(segmentsContainer, getSegment(), + bytes.NewReader(zeroBuf), false, "", + d.getContentType(), nil) + currentLength += d.ChunkSize + partNumber++ + } + + cursor = currentLength + paddingReader = bytes.NewReader(zeroBuf) + } else { + // Offset is inside the current segment : we need to read the + // data from the beginning of the segment to offset + paddingReader, _, err = d.Conn.ObjectOpen(segmentsContainer, getSegment(), false, nil) + if err != nil { + return bytesRead, parseError(getSegment(), err) + } + } + + multi := io.MultiReader( + io.LimitReader(paddingReader, offset - cursor), + io.LimitReader(reader, d.ChunkSize - (offset - cursor)), + ) + + for { + currentSegment, err := d.Conn.ObjectCreate(segmentsContainer, getSegment(), false, "", d.getContentType(), nil) + if err != nil { + return bytesRead, parseError(path, err) + } + + n, err := io.Copy(currentSegment, multi) + if err != nil { + return bytesRead, parseError(path, err) + } + + if n < d.ChunkSize { + // We wrote all the data + if cursor + n < currentLength { + // Copy the end of the chunk + headers := make(swift.Headers) + headers["Range"] = "bytes=" + strconv.FormatInt(cursor + n, 10) + "-" + strconv.FormatInt(cursor + d.ChunkSize, 10) + file, _, err := d.Conn.ObjectOpen(d.Container, d.swiftPath(path), false, headers) + if err != nil { + return bytesRead, parseError(path, err) + } + io.Copy(currentSegment, file) + file.Close() + } + if n > 0 { + currentSegment.Close() + bytesRead += n - max(0, offset - cursor) + } + break + } + + currentSegment.Close() + bytesRead += n - max(0, offset - cursor) + multi = io.MultiReader(io.LimitReader(reader, d.ChunkSize)) + cursor += d.ChunkSize + partNumber++ + } + + return bytesRead, nil +} + +// Stat retrieves the FileInfo for the given path, including the current size +// in bytes and the creation time. +func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) { + info, _, err := d.Conn.Object(d.Container, d.swiftPath(path)) + if err != nil { + return nil, parseError(path, err) + } + + fi := storagedriver.FileInfoFields{ + Path: path, + IsDir: info.ContentType == "application/directory", + Size: info.Bytes, + ModTime: info.LastModified, + } + + return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil +} + +// List returns a list of the objects that are direct descendants of the given path. +func (d *driver) List(ctx context.Context, path string) ([]string, error) { + prefix := d.swiftPath(path) + if prefix != "" { + prefix += "/" + } + + opts := &swift.ObjectsOpts{ + Path: prefix, + Delimiter: '/', + } + + files, err := d.Conn.ObjectNames(d.Container, opts) + for index, name := range files { + files[index] = "/" + strings.TrimSuffix(name, "/") + } + + return files, parseError(path, err) +} + +// Move moves an object stored at sourcePath to destPath, removing the original +// object. +func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error { + err := d.Conn.ObjectMove(d.Container, d.swiftPath(sourcePath), + d.Container, d.swiftPath(destPath)) + if err != nil { + return parseError(sourcePath, err) + } + + return nil +} + +// Delete recursively deletes all objects stored at "path" and its subpaths. +func (d *driver) Delete(ctx context.Context, path string) error { + opts := swift.ObjectsOpts{ + Prefix: d.swiftPath(path), + } + + objects, err := d.Conn.ObjectNamesAll(d.Container, &opts) + if err != nil { + return parseError(path, err) + } + if len(objects) == 0 { + return storagedriver.PathNotFoundError{Path: path} + } + + for index, name := range objects { + objects[index] = name[len(d.Prefix):] + } + + var multiDelete = true + if d.BulkDeleteSupport { + _, err := d.Conn.BulkDelete(d.Container, objects) + multiDelete = err != nil + } + if multiDelete { + for _, name := range objects { + if _, headers, err := d.Conn.Object(d.Container, name); err == nil { + manifest, ok := headers["X-Object-Manifest"] + if ok { + components := strings.SplitN(manifest, "/", 2) + segContainer := components[0] + segments, err := d.Conn.ObjectNamesAll(segContainer, &swift.ObjectsOpts{ Prefix: components[1] }) + if err != nil { + return parseError(name, err) + } + + for _, s := range segments { + if err := d.Conn.ObjectDelete(segContainer, s); err != nil { + return parseError(s, err) + } + } + } + } else { + return parseError(name, err) + } + + if err := d.Conn.ObjectDelete(d.Container, name); err != nil { + return parseError(name, err) + } + } + } + + return nil +} + +// URLFor returns a URL which may be used to retrieve the content stored at the given path. +// May return an UnsupportedMethodErr in certain StorageDriver implementations. +func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) { + return "", storagedriver.ErrUnsupportedMethod +} + +func (d *driver) swiftPath(path string) string { + return strings.TrimLeft(strings.TrimRight(d.Prefix, "/")+path, "/") +} + +func (d *driver) createParentFolder(path string) (string, error) { + dir := gopath.Dir(path) + if dir != "/" { + _, _, err := d.Conn.Object(d.Container, d.swiftPath(dir)) + if swiftErr, ok := err.(*swift.Error); ok && swiftErr.StatusCode == 404 { + _, err := d.Conn.ObjectPut(d.Container, d.swiftPath(dir), bytes.NewReader(make([]byte, 0)), + false, "", "application/directory", nil) + if err != nil { + return dir, err + } + } + } + + return dir, nil +} + +func (d *driver) getContentType() string { + return "application/octet-stream" +} + +func detectBulkDelete(authURL string) (bulkDelete bool) { + resp, err := http.Get(filepath.Join(authURL, "..", "..") + "/info") + if err == nil { + defer resp.Body.Close() + decoder := json.NewDecoder(resp.Body) + var infos swiftInfo + if decoder.Decode(&infos) == nil { + _, bulkDelete = infos["bulk_delete"] + } + } + return +} + +func parseError(path string, err error) error { + if swiftErr, ok := err.(*swift.Error); ok && swiftErr.StatusCode == 404 { + return storagedriver.PathNotFoundError{Path: path} + } + + return err +} diff --git a/docs/storage/driver/swift/swift_test.go b/docs/storage/driver/swift/swift_test.go new file mode 100644 index 00000000..6038c319 --- /dev/null +++ b/docs/storage/driver/swift/swift_test.go @@ -0,0 +1,141 @@ +package swift + +import ( + "io/ioutil" + "os" + "testing" + + "github.com/lebauce/swift/swifttest" + + "github.com/docker/distribution/context" + storagedriver "github.com/docker/distribution/registry/storage/driver" + "github.com/docker/distribution/registry/storage/driver/testsuites" + + "gopkg.in/check.v1" +) + +// Hook up gocheck into the "go test" runner. +func Test(t *testing.T) { check.TestingT(t) } + +type SwiftDriverConstructor func(rootDirectory string) (*Driver, error) + +func init() { + var ( + username string + password string + authURL string + tenant string + container string + region string + prefix string + swiftServer *swifttest.SwiftServer + err error + ) + if username = os.Getenv("OS_USERNAME"); username == "" { + username = os.Getenv("ST_USER") + } + if password = os.Getenv("OS_PASSWORD"); password == "" { + password = os.Getenv("ST_KEY") + } + if authURL = os.Getenv("OS_AUTH_URL"); authURL == "" { + authURL = os.Getenv("ST_AUTH") + } + tenant = os.Getenv("OS_TENANT_NAME") + container = os.Getenv("OS_CONTAINER_NAME") + region = os.Getenv("OS_REGION_NAME") + prefix = os.Getenv("OS_CONTAINER_PREFIX") + + if username == "" || password == "" || authURL == "" || container == "" { + if swiftServer, err = swifttest.NewSwiftServer("localhost"); err != nil { + panic(err) + } + username = "swifttest" + password = "swifttest" + authURL = swiftServer.AuthURL + container = "test" + } + + root, err := ioutil.TempDir("", "driver-") + if err != nil { + panic(err) + } + defer os.Remove(root) + + swiftDriverConstructor := func(rootDirectory string) (*Driver, error) { + parameters := DriverParameters{ + username, + password, + authURL, + tenant, + region, + container, + prefix, + defaultChunkSize, + } + + return New(parameters) + } + + skipCheck := func() string { + return "" + } + + driverConstructor := func() (storagedriver.StorageDriver, error) { + return swiftDriverConstructor(root) + } + + testsuites.RegisterInProcessSuite(driverConstructor, skipCheck) + + RegisterSwiftDriverSuite(swiftDriverConstructor, skipCheck, swiftServer) +} + +func RegisterSwiftDriverSuite(swiftDriverConstructor SwiftDriverConstructor, skipCheck testsuites.SkipCheck, + swiftServer *swifttest.SwiftServer) { + check.Suite(&SwiftDriverSuite{ + Constructor: swiftDriverConstructor, + SkipCheck: skipCheck, + SwiftServer: swiftServer, + }) +} + +type SwiftDriverSuite struct { + Constructor SwiftDriverConstructor + SwiftServer *swifttest.SwiftServer + testsuites.SkipCheck +} + +func (suite *SwiftDriverSuite) SetUpSuite(c *check.C) { + if reason := suite.SkipCheck(); reason != "" { + c.Skip(reason) + } +} + +func (suite *SwiftDriverSuite) TestEmptyRootList(c *check.C) { + validRoot, err := ioutil.TempDir("", "driver-") + c.Assert(err, check.IsNil) + defer os.Remove(validRoot) + + rootedDriver, err := suite.Constructor(validRoot) + c.Assert(err, check.IsNil) + emptyRootDriver, err := suite.Constructor("") + c.Assert(err, check.IsNil) + slashRootDriver, err := suite.Constructor("/") + c.Assert(err, check.IsNil) + + filename := "/test" + contents := []byte("contents") + ctx := context.Background() + err = rootedDriver.PutContent(ctx, filename, contents) + c.Assert(err, check.IsNil) + defer rootedDriver.Delete(ctx, filename) + + keys, err := emptyRootDriver.List(ctx, "/") + for _, path := range keys { + c.Assert(storagedriver.PathRegexp.MatchString(path), check.Equals, true) + } + + keys, err = slashRootDriver.List(ctx, "/") + for _, path := range keys { + c.Assert(storagedriver.PathRegexp.MatchString(path), check.Equals, true) + } +}