From 98ad17f757f962843274a97070f4c48c6dcd5444 Mon Sep 17 00:00:00 2001 From: Arthur Baars Date: Mon, 20 Jul 2015 18:45:15 +0100 Subject: [PATCH] Storage driver for: Google Cloud Storage (gcs) Signed-off-by: Arthur Baars --- docs/storage/driver/gcs/doc.go | 3 + docs/storage/driver/gcs/gcs.go | 623 ++++++++++++++++++++++++++++ docs/storage/driver/gcs/gcs_test.go | 106 +++++ 3 files changed, 732 insertions(+) create mode 100644 docs/storage/driver/gcs/doc.go create mode 100644 docs/storage/driver/gcs/gcs.go create mode 100644 docs/storage/driver/gcs/gcs_test.go diff --git a/docs/storage/driver/gcs/doc.go b/docs/storage/driver/gcs/doc.go new file mode 100644 index 00000000..0f23ea78 --- /dev/null +++ b/docs/storage/driver/gcs/doc.go @@ -0,0 +1,3 @@ +// Package gcs implements the Google Cloud Storage driver backend. Support can be +// enabled by including the "include_gcs" build tag. +package gcs diff --git a/docs/storage/driver/gcs/gcs.go b/docs/storage/driver/gcs/gcs.go new file mode 100644 index 00000000..8dc96675 --- /dev/null +++ b/docs/storage/driver/gcs/gcs.go @@ -0,0 +1,623 @@ +// Package gcs provides a storagedriver.StorageDriver implementation to +// store blobs in Google cloud storage. +// +// This package leverages the google.golang.org/cloud/storage client library +//for interfacing with gcs. +// +// Because gcs is a key, value store the Stat call does not support last modification +// time for directories (directories are an abstraction for key, value stores) +// +// Keep in mind that gcs guarantees only eventual consistency, so do not assume +// that a successful write will mean immediate access to the data written (although +// in most regions a new object put has guaranteed read after write). The only true +// guarantee is that once you call Stat and receive a certain file size, that much of +// the file is already accessible. +// +// +build include_gcs + +package gcs + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "math/rand" + "net/http" + "net/url" + "sort" + "strings" + "time" + + "golang.org/x/net/context" + "golang.org/x/oauth2" + "golang.org/x/oauth2/google" + + "google.golang.org/api/googleapi" + storageapi "google.golang.org/api/storage/v1" + "google.golang.org/cloud" + "google.golang.org/cloud/storage" + + ctx "github.com/docker/distribution/context" + storagedriver "github.com/docker/distribution/registry/storage/driver" + "github.com/docker/distribution/registry/storage/driver/base" + "github.com/docker/distribution/registry/storage/driver/factory" +) + +const driverName = "gcs" +const dummyProjectID = "" + +//DriverParameters A struct that encapsulates all of the driver parameters after all values have been set +type driverParameters struct { + bucket string + keyfile string + rootDirectory string +} + +func init() { + factory.Register(driverName, &gcsDriverFactory{}) +} + +// gcsDriverFactory implements the factory.StorageDriverFactory interface +type gcsDriverFactory struct{} + +// Create StorageDriver from parameters +func (factory *gcsDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) { + return FromParameters(parameters) +} + +// driver is a storagedriver.StorageDriver implementation backed by GCS +// Objects are stored at absolute keys in the provided bucket. +type driver struct { + client *http.Client + bucket string + email string + privateKey []byte + rootDirectory string +} + +// FromParameters constructs a new Driver with a given parameters map +// Required parameters: +// - bucket +func FromParameters(parameters map[string]interface{}) (storagedriver.StorageDriver, error) { + + bucket, ok := parameters["bucket"] + if !ok || fmt.Sprint(bucket) == "" { + return nil, fmt.Errorf("No bucket parameter provided") + } + + keyfile, ok := parameters["keyfile"] + if !ok { + keyfile = "" + } + + rootDirectory, ok := parameters["rootdirectory"] + if !ok { + rootDirectory = "" + } + params := driverParameters{ + fmt.Sprint(bucket), + fmt.Sprint(keyfile), + fmt.Sprint(rootDirectory), + } + + return New(params) +} + +// New constructs a new driver +func New(params driverParameters) (storagedriver.StorageDriver, error) { + var ts oauth2.TokenSource + var err error + rootDirectory := strings.Trim(params.rootDirectory, "/") + if rootDirectory != "" { + rootDirectory += "/" + } + d := &driver{ + bucket: params.bucket, + rootDirectory: rootDirectory, + } + if params.keyfile == "" { + ts, err = google.DefaultTokenSource(context.Background(), storage.ScopeFullControl) + if err != nil { + return nil, err + } + } else { + jsonKey, err := ioutil.ReadFile(params.keyfile) + if err != nil { + return nil, err + } + conf, err := google.JWTConfigFromJSON( + jsonKey, + storage.ScopeFullControl, + ) + if err != nil { + return nil, err + } + ts = conf.TokenSource(context.Background()) + d.email = conf.Email + d.privateKey = conf.PrivateKey + } + client := oauth2.NewClient(context.Background(), ts) + d.client = client + if err != nil { + return nil, err + } + return &base.Base{ + StorageDriver: d, + }, nil +} + +// Implement the storagedriver.StorageDriver interface + +func (d *driver) Name() string { + return driverName +} + +// GetContent retrieves the content stored at "path" as a []byte. +// This should primarily be used for small objects. +func (d *driver) GetContent(context ctx.Context, path string) ([]byte, error) { + rc, err := d.ReadStream(context, path, 0) + if err != nil { + return nil, err + } + defer rc.Close() + + p, err := ioutil.ReadAll(rc) + if err != nil { + return nil, err + } + return p, nil +} + +// PutContent stores the []byte content at a location designated by "path". +// This should primarily be used for small objects. +func (d *driver) PutContent(context ctx.Context, path string, contents []byte) error { + wc := storage.NewWriter(d.context(context), d.bucket, d.pathToKey(path)) + wc.ContentType = "application/octet-stream" + defer wc.Close() + _, err := wc.Write(contents) + return err +} + +// ReadStream retrieves an io.ReadCloser for the content stored at "path" +// with a given byte offset. +// May be used to resume reading a stream by providing a nonzero offset. +func (d *driver) ReadStream(context ctx.Context, path string, offset int64) (io.ReadCloser, error) { + name := d.pathToKey(path) + + // copied from google.golang.org/cloud/storage#NewReader : + // to set the additional "Range" header + u := &url.URL{ + Scheme: "https", + Host: "storage.googleapis.com", + Path: fmt.Sprintf("/%s/%s", d.bucket, name), + } + req, err := http.NewRequest("GET", u.String(), nil) + if err != nil { + return nil, err + } + if offset > 0 { + req.Header.Set("Range", fmt.Sprintf("bytes=%v-", offset)) + } + res, err := d.client.Do(req) + if err != nil { + return nil, err + } + if res.StatusCode == http.StatusNotFound { + res.Body.Close() + return nil, storagedriver.PathNotFoundError{Path: path} + } + if res.StatusCode == http.StatusRequestedRangeNotSatisfiable { + res.Body.Close() + obj, err := storage.StatObject(d.context(context), d.bucket, name) + if err != nil { + return nil, err + } + if offset == int64(obj.Size) { + return ioutil.NopCloser(bytes.NewReader([]byte{})), nil + } + return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset} + } + if res.StatusCode < 200 || res.StatusCode > 299 { + res.Body.Close() + return nil, fmt.Errorf("storage: can't read object %v/%v, status code: %v", d.bucket, name, res.Status) + } + return res.Body, nil +} + +// WriteStream stores the contents of the provided io.ReadCloser at a +// location designated by the given path. +// May be used to resume writing a stream by providing a nonzero offset. +// The offset must be no larger than the CurrentSize for this path. +func (d *driver) WriteStream(context ctx.Context, path string, offset int64, reader io.Reader) (totalRead int64, err error) { + if offset < 0 { + return 0, storagedriver.InvalidOffsetError{Path: path, Offset: offset} + } + + if offset == 0 { + return d.writeCompletely(context, path, 0, reader) + } + + service, err := storageapi.New(d.client) + if err != nil { + return 0, err + } + objService := storageapi.NewObjectsService(service) + var obj *storageapi.Object + err = retry(5, func() error { + o, err := objService.Get(d.bucket, d.pathToKey(path)).Do() + obj = o + return err + }) + // obj, err := retry(5, objService.Get(d.bucket, d.pathToKey(path)).Do) + if err != nil { + return 0, err + } + + // cannot append more chunks, so redo from scratch + if obj.ComponentCount >= 1023 { + return d.writeCompletely(context, path, offset, reader) + } + + // skip from reader + objSize := int64(obj.Size) + nn, err := skip(reader, objSize-offset) + if err != nil { + return nn, err + } + + // Size <= offset + partName := fmt.Sprintf("%v#part-%d#", d.pathToKey(path), obj.ComponentCount) + gcsContext := d.context(context) + wc := storage.NewWriter(gcsContext, d.bucket, partName) + wc.ContentType = "application/octet-stream" + + if objSize < offset { + err = writeZeros(wc, offset-objSize) + if err != nil { + wc.CloseWithError(err) + return nn, err + } + } + n, err := io.Copy(wc, reader) + if err != nil { + wc.CloseWithError(err) + return nn, err + } + err = wc.Close() + if err != nil { + return nn, err + } + // wc was closed succesfully, so the temporary part exists, schedule it for deletion at the end + // of the function + defer storage.DeleteObject(gcsContext, d.bucket, partName) + + req := &storageapi.ComposeRequest{ + Destination: &storageapi.Object{Bucket: obj.Bucket, Name: obj.Name, ContentType: obj.ContentType}, + SourceObjects: []*storageapi.ComposeRequestSourceObjects{ + { + Name: obj.Name, + Generation: obj.Generation, + }, { + Name: partName, + Generation: wc.Object().Generation, + }}, + } + + err = retry(5, func() error { _, err := objService.Compose(d.bucket, obj.Name, req).Do(); return err }) + if err == nil { + nn = nn + n + } + + return nn, err +} + +type request func() error + +func retry(maxTries int, req request) error { + backoff := time.Second + var err error + for i := 0; i < maxTries; i++ { + err := req() + if err == nil { + return nil + } + + status := err.(*googleapi.Error) + if status == nil || (status.Code != 429 && status.Code < http.StatusInternalServerError) { + return err + } + + time.Sleep(backoff - time.Second + (time.Duration(rand.Int31n(1000)) * time.Millisecond)) + if i <= 4 { + backoff = backoff * 2 + } + } + return err +} + +func (d *driver) writeCompletely(context ctx.Context, path string, offset int64, reader io.Reader) (totalRead int64, err error) { + wc := storage.NewWriter(d.context(context), d.bucket, d.pathToKey(path)) + wc.ContentType = "application/octet-stream" + defer wc.Close() + + // Copy the first offset bytes of the existing contents + // (padded with zeros if needed) into the writer + if offset > 0 { + existing, err := d.ReadStream(context, path, 0) + if err != nil { + return 0, err + } + defer existing.Close() + n, err := io.CopyN(wc, existing, offset) + if err == io.EOF { + err = writeZeros(wc, offset-n) + } + if err != nil { + return 0, err + } + } + return io.Copy(wc, reader) +} + +func skip(reader io.Reader, count int64) (int64, error) { + if count <= 0 { + return 0, nil + } + return io.CopyN(ioutil.Discard, reader, count) +} + +func writeZeros(wc io.Writer, count int64) error { + buf := make([]byte, 32*1024) + for count > 0 { + size := cap(buf) + if int64(size) > count { + size = int(count) + } + n, err := wc.Write(buf[0:size]) + if err != nil { + return err + } + count = count - int64(n) + } + return nil +} + +// Stat retrieves the FileInfo for the given path, including the current +// size in bytes and the creation time. +func (d *driver) Stat(context ctx.Context, path string) (storagedriver.FileInfo, error) { + var fi storagedriver.FileInfoFields + //try to get as file + gcsContext := d.context(context) + obj, err := storage.StatObject(gcsContext, d.bucket, d.pathToKey(path)) + if err == nil { + fi = storagedriver.FileInfoFields{ + Path: path, + Size: obj.Size, + ModTime: obj.Updated, + IsDir: false, + } + return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil + } + //try to get as folder + dirpath := d.pathToDirKey(path) + + var query *storage.Query + query = &storage.Query{} + query.Prefix = dirpath + query.MaxResults = 1 + + objects, err := storage.ListObjects(gcsContext, d.bucket, query) + if err != nil { + return nil, err + } + if len(objects.Results) < 1 { + return nil, storagedriver.PathNotFoundError{Path: path} + } + fi = storagedriver.FileInfoFields{ + Path: path, + IsDir: true, + } + obj = objects.Results[0] + if obj.Name == dirpath { + fi.Size = obj.Size + fi.ModTime = obj.Updated + } + return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil +} + +// List returns a list of the objects that are direct descendants of the +//given path. +func (d *driver) List(context ctx.Context, path string) ([]string, error) { + var query *storage.Query + query = &storage.Query{} + query.Delimiter = "/" + query.Prefix = d.pathToDirKey(path) + list := make([]string, 0, 64) + for { + objects, err := storage.ListObjects(d.context(context), d.bucket, query) + if err != nil { + return nil, err + } + for _, object := range objects.Results { + // GCS does not guarantee strong consistency between + // DELETE and LIST operationsCheck that the object is not deleted, + // so filter out any objects with a non-zero time-deleted + if object.Deleted.IsZero() { + name := object.Name + // Ignore objects with names that end with '#' (these are uploaded parts) + if name[len(name)-1] != '#' { + name = d.keyToPath(name) + list = append(list, name) + } + } + } + for _, subpath := range objects.Prefixes { + subpath = d.keyToPath(subpath) + list = append(list, subpath) + } + query = objects.Next + if query == nil { + break + } + } + return list, nil +} + +// Move moves an object stored at sourcePath to destPath, removing the +// original object. +func (d *driver) Move(context ctx.Context, sourcePath string, destPath string) error { + prefix := d.pathToDirKey(sourcePath) + gcsContext := d.context(context) + keys, err := d.listAll(gcsContext, prefix) + if err != nil { + return err + } + if len(keys) > 0 { + destPrefix := d.pathToDirKey(destPath) + copies := make([]string, 0, len(keys)) + sort.Strings(keys) + var err error + for _, key := range keys { + dest := destPrefix + key[len(prefix):] + _, err = storage.CopyObject(gcsContext, d.bucket, key, d.bucket, dest, nil) + if err == nil { + copies = append(copies, dest) + } else { + break + } + } + // if an error occurred, attempt to cleanup the copies made + if err != nil { + for i := len(copies) - 1; i >= 0; i-- { + _ = storage.DeleteObject(gcsContext, d.bucket, copies[i]) + } + return err + } + // delete originals + for i := len(keys) - 1; i >= 0; i-- { + err2 := storage.DeleteObject(gcsContext, d.bucket, keys[i]) + if err2 != nil { + err = err2 + } + } + return err + } + _, err = storage.CopyObject(gcsContext, d.bucket, d.pathToKey(sourcePath), d.bucket, d.pathToKey(destPath), nil) + if err != nil { + if status := err.(*googleapi.Error); status != nil { + if status.Code == http.StatusNotFound { + return storagedriver.PathNotFoundError{Path: sourcePath} + } + } + return err + } + return storage.DeleteObject(gcsContext, d.bucket, d.pathToKey(sourcePath)) +} + +// listAll recursively lists all names of objects stored at "prefix" and its subpaths. +func (d *driver) listAll(context context.Context, prefix string) ([]string, error) { + list := make([]string, 0, 64) + query := &storage.Query{} + query.Prefix = prefix + query.Versions = false + for { + objects, err := storage.ListObjects(d.context(context), d.bucket, query) + if err != nil { + return nil, err + } + for _, obj := range objects.Results { + // GCS does not guarantee strong consistency between + // DELETE and LIST operationsCheck that the object is not deleted, + // so filter out any objects with a non-zero time-deleted + if obj.Deleted.IsZero() { + list = append(list, obj.Name) + } + } + query = objects.Next + if query == nil { + break + } + } + return list, nil +} + +// Delete recursively deletes all objects stored at "path" and its subpaths. +func (d *driver) Delete(context ctx.Context, path string) error { + prefix := d.pathToDirKey(path) + gcsContext := d.context(context) + keys, err := d.listAll(gcsContext, prefix) + if err != nil { + return err + } + if len(keys) > 0 { + sort.Sort(sort.Reverse(sort.StringSlice(keys))) + for _, key := range keys { + if err := storage.DeleteObject(gcsContext, d.bucket, key); err != nil { + return err + } + } + return nil + } + err = storage.DeleteObject(gcsContext, d.bucket, d.pathToKey(path)) + if err != nil { + if status := err.(*googleapi.Error); status != nil { + if status.Code == http.StatusNotFound { + return storagedriver.PathNotFoundError{Path: path} + } + } + } + return err +} + +// URLFor returns a URL which may be used to retrieve the content stored at +// the given path, possibly using the given options. +// Returns ErrUnsupportedMethod if this driver has no privateKey +func (d *driver) URLFor(context ctx.Context, path string, options map[string]interface{}) (string, error) { + if d.privateKey == nil { + return "", storagedriver.ErrUnsupportedMethod + } + + name := d.pathToKey(path) + methodString := "GET" + method, ok := options["method"] + if ok { + methodString, ok = method.(string) + if !ok || (methodString != "GET" && methodString != "HEAD") { + return "", storagedriver.ErrUnsupportedMethod + } + } + + expiresTime := time.Now().Add(20 * time.Minute) + expires, ok := options["expiry"] + if ok { + et, ok := expires.(time.Time) + if ok { + expiresTime = et + } + } + + opts := &storage.SignedURLOptions{ + GoogleAccessID: d.email, + PrivateKey: d.privateKey, + Method: methodString, + Expires: expiresTime, + } + return storage.SignedURL(d.bucket, name, opts) +} + +func (d *driver) context(context ctx.Context) context.Context { + return cloud.WithContext(context, dummyProjectID, d.client) +} + +func (d *driver) pathToKey(path string) string { + return strings.TrimRight(d.rootDirectory+strings.TrimLeft(path, "/"), "/") +} + +func (d *driver) pathToDirKey(path string) string { + return d.pathToKey(path) + "/" +} + +func (d *driver) keyToPath(key string) string { + return "/" + strings.Trim(strings.TrimPrefix(key, d.rootDirectory), "/") +} diff --git a/docs/storage/driver/gcs/gcs_test.go b/docs/storage/driver/gcs/gcs_test.go new file mode 100644 index 00000000..7afc4e70 --- /dev/null +++ b/docs/storage/driver/gcs/gcs_test.go @@ -0,0 +1,106 @@ +// +build include_gcs + +package gcs + +import ( + "io/ioutil" + "os" + "testing" + + ctx "github.com/docker/distribution/context" + storagedriver "github.com/docker/distribution/registry/storage/driver" + "github.com/docker/distribution/registry/storage/driver/testsuites" + + "gopkg.in/check.v1" +) + +// Hook up gocheck into the "go test" runner. +func Test(t *testing.T) { check.TestingT(t) } + +var gcsDriverConstructor func(rootDirectory string) (storagedriver.StorageDriver, error) +var skipGCS func() string + +func init() { + bucket := os.Getenv("REGISTRY_STORAGE_GCS_BUCKET") + keyfile := os.Getenv("REGISTRY_STORAGE_GCS_KEYFILE") + credentials := os.Getenv("GOOGLE_APPLICATION_CREDENTIALS") + + root, err := ioutil.TempDir("", "driver-") + if err != nil { + panic(err) + } + defer os.Remove(root) + + gcsDriverConstructor = func(rootDirectory string) (storagedriver.StorageDriver, error) { + + parameters := driverParameters{ + bucket, + keyfile, + rootDirectory, + } + + return New(parameters) + } + + // Skip GCS storage driver tests if environment variable parameters are not provided + skipGCS = func() string { + if bucket == "" || (credentials == "" && keyfile == "") { + return "Must set REGISTRY_STORAGE_GCS_BUCKET and (GOOGLE_APPLICATION_CREDENTIALS or REGISTRY_STORAGE_GCS_KEYFILE) to run GCS tests" + } + return "" + } + + testsuites.RegisterSuite(func() (storagedriver.StorageDriver, error) { + return gcsDriverConstructor(root) + }, skipGCS) +} + +func TestEmptyRootList(t *testing.T) { + if skipGCS() != "" { + t.Skip(skipGCS()) + } + + validRoot, err := ioutil.TempDir("", "driver-") + if err != nil { + t.Fatalf("unexpected error creating temporary directory: %v", err) + } + defer os.Remove(validRoot) + + rootedDriver, err := gcsDriverConstructor(validRoot) + if err != nil { + t.Fatalf("unexpected error creating rooted driver: %v", err) + } + + emptyRootDriver, err := gcsDriverConstructor("") + if err != nil { + t.Fatalf("unexpected error creating empty root driver: %v", err) + } + + slashRootDriver, err := gcsDriverConstructor("/") + if err != nil { + t.Fatalf("unexpected error creating slash root driver: %v", err) + } + + filename := "/test" + contents := []byte("contents") + ctx := ctx.Background() + err = rootedDriver.PutContent(ctx, filename, contents) + if err != nil { + t.Fatalf("unexpected error creating content: %v", err) + } + defer rootedDriver.Delete(ctx, filename) + + keys, err := emptyRootDriver.List(ctx, "/") + for _, path := range keys { + if !storagedriver.PathRegexp.MatchString(path) { + t.Fatalf("unexpected string in path: %q != %q", path, storagedriver.PathRegexp) + } + } + + keys, err = slashRootDriver.List(ctx, "/") + for _, path := range keys { + if !storagedriver.PathRegexp.MatchString(path) { + t.Fatalf("unexpected string in path: %q != %q", path, storagedriver.PathRegexp) + } + } +}