From 0207adaa5c90ce6ef803b00191a5240ada5454fe Mon Sep 17 00:00:00 2001 From: Flavian Missi Date: Tue, 23 May 2023 10:42:39 +0200 Subject: [PATCH] registry/storage/driver/gcs: fix code to use updated gcs driver Signed-off-by: Flavian Missi --- registry/storage/driver/gcs/gcs.go | 242 ++++++++++++------------ registry/storage/driver/gcs/gcs_test.go | 29 ++- 2 files changed, 146 insertions(+), 125 deletions(-) diff --git a/registry/storage/driver/gcs/gcs.go b/registry/storage/driver/gcs/gcs.go index f2b6f49a..d47db2e1 100644 --- a/registry/storage/driver/gcs/gcs.go +++ b/registry/storage/driver/gcs/gcs.go @@ -19,6 +19,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "io" "math/rand" @@ -32,6 +33,7 @@ import ( "strings" "time" + "cloud.google.com/go/storage" storagedriver "github.com/distribution/distribution/v3/registry/storage/driver" "github.com/distribution/distribution/v3/registry/storage/driver/base" "github.com/distribution/distribution/v3/registry/storage/driver/factory" @@ -40,8 +42,8 @@ import ( "golang.org/x/oauth2/google" "golang.org/x/oauth2/jwt" "google.golang.org/api/googleapi" - "google.golang.org/cloud" - "google.golang.org/cloud/storage" + "google.golang.org/api/iterator" + "google.golang.org/api/option" ) const ( @@ -59,6 +61,8 @@ const ( var rangeHeader = regexp.MustCompile(`^bytes=([0-9])+-([0-9]+)$`) +var _ storagedriver.FileWriter = &writer{} + // driverParameters is a struct that encapsulates all of the driver parameters after all values have been set type driverParameters struct { bucket string @@ -68,6 +72,7 @@ type driverParameters struct { client *http.Client rootDirectory string chunkSize int + gcs *storage.Client // maxConcurrency limits the number of concurrent driver operations // to GCS, which ultimately increases reliability of many simultaneous @@ -97,6 +102,7 @@ type driver struct { privateKey []byte rootDirectory string chunkSize int + gcs *storage.Client } // Wrapper wraps `driver` with a throttler, ensuring that no more than N @@ -113,6 +119,7 @@ type baseEmbed struct { // Required parameters: // - bucket func FromParameters(parameters map[string]interface{}) (storagedriver.StorageDriver, error) { + ctx := context.TODO() bucket, ok := parameters["bucket"] if !ok || fmt.Sprint(bucket) == "" { return nil, fmt.Errorf("No bucket parameter provided") @@ -150,6 +157,8 @@ func FromParameters(parameters map[string]interface{}) (storagedriver.StorageDri var ts oauth2.TokenSource jwtConf := new(jwt.Config) + var err error + var gcs *storage.Client if keyfile, ok := parameters["keyfile"]; ok { jsonKey, err := os.ReadFile(fmt.Sprint(keyfile)) if err != nil { @@ -159,7 +168,11 @@ func FromParameters(parameters map[string]interface{}) (storagedriver.StorageDri if err != nil { return nil, err } - ts = jwtConf.TokenSource(context.Background()) + ts = jwtConf.TokenSource(ctx) + gcs, err = storage.NewClient(ctx, option.WithCredentialsFile(fmt.Sprint(keyfile))) + if err != nil { + return nil, err + } } else if credentials, ok := parameters["credentials"]; ok { credentialMap, ok := credentials.(map[interface{}]interface{}) if !ok { @@ -184,10 +197,14 @@ func FromParameters(parameters map[string]interface{}) (storagedriver.StorageDri if err != nil { return nil, err } - ts = jwtConf.TokenSource(context.Background()) + ts = jwtConf.TokenSource(ctx) + gcs, err = storage.NewClient(ctx, option.WithCredentialsJSON(data)) + if err != nil { + return nil, err + } } else { var err error - ts, err = google.DefaultTokenSource(context.Background(), storage.ScopeFullControl) + ts, err = google.DefaultTokenSource(ctx, storage.ScopeFullControl) if err != nil { return nil, err } @@ -198,14 +215,18 @@ func FromParameters(parameters map[string]interface{}) (storagedriver.StorageDri return nil, fmt.Errorf("maxconcurrency config error: %s", err) } + if gcs == nil { + panic("gcs client was nil") + } params := driverParameters{ bucket: fmt.Sprint(bucket), rootDirectory: fmt.Sprint(rootDirectory), email: jwtConf.Email, privateKey: jwtConf.PrivateKey, - client: oauth2.NewClient(context.Background(), ts), + client: oauth2.NewClient(ctx, ts), chunkSize: chunkSize, maxConcurrency: maxConcurrency, + gcs: gcs, } return New(params) @@ -227,6 +248,7 @@ func New(params driverParameters) (storagedriver.StorageDriver, error) { privateKey: params.privateKey, client: params.client, chunkSize: params.chunkSize, + gcs: params.gcs, } return &Wrapper{ @@ -246,15 +268,9 @@ func (d *driver) Name() string { // GetContent retrieves the content stored at "path" as a []byte. // This should primarily be used for small objects. -func (d *driver) GetContent(context context.Context, path string) ([]byte, error) { - gcsContext := d.context(context) +func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) { name := d.pathToKey(path) - var rc io.ReadCloser - err := retry(func() error { - var err error - rc, err = storage.NewReader(gcsContext, d.bucket, name) - return err - }) + rc, err := d.gcs.Bucket(d.bucket).Object(name).NewReader(ctx) if err == storage.ErrObjectNotExist { return nil, storagedriver.PathNotFoundError{Path: path} } @@ -272,18 +288,16 @@ func (d *driver) GetContent(context context.Context, path string) ([]byte, error // PutContent stores the []byte content at a location designated by "path". // This should primarily be used for small objects. -func (d *driver) PutContent(context context.Context, path string, contents []byte) error { - return retry(func() error { - wc := storage.NewWriter(d.context(context), d.bucket, d.pathToKey(path)) - wc.ContentType = "application/octet-stream" - return putContentsClose(wc, contents) - }) +func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error { + wc := d.gcs.Bucket(d.bucket).Object(d.pathToKey(path)).NewWriter(ctx) + wc.ContentType = "application/octet-stream" + return putContentsClose(wc, contents) } // Reader retrieves an io.ReadCloser for the content stored at "path" // with a given byte offset. // May be used to resume reading a stream by providing a nonzero offset. -func (d *driver) Reader(context context.Context, path string, offset int64) (io.ReadCloser, error) { +func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { res, err := getObject(d.client, d.bucket, d.pathToKey(path), offset) if err != nil { if res != nil { @@ -294,7 +308,7 @@ func (d *driver) Reader(context context.Context, path string, offset int64) (io. if res.StatusCode == http.StatusRequestedRangeNotSatisfiable { res.Body.Close() - obj, err := storageStatObject(d.context(context), d.bucket, d.pathToKey(path)) + obj, err := d.storageStatObject(ctx, path) if err != nil { return nil, err } @@ -314,7 +328,7 @@ func (d *driver) Reader(context context.Context, path string, offset int64) (io. } func getObject(client *http.Client, bucket string, name string, offset int64) (*http.Response, error) { - // copied from google.golang.org/cloud/storage#NewReader : + // copied from cloud.google.com/go/storage#NewReader : // to set the additional "Range" header u := &url.URL{ Scheme: "https", @@ -342,12 +356,13 @@ func getObject(client *http.Client, bucket string, name string, offset int64) (* // Writer returns a FileWriter which will store the content written to it // at the location designated by "path" after the call to Commit. -func (d *driver) Writer(context context.Context, path string, append bool) (storagedriver.FileWriter, error) { +func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) { writer := &writer{ client: d.client, bucket: d.bucket, name: d.pathToKey(path), buffer: make([]byte, d.chunkSize), + gcs: d.gcs, } if append { @@ -369,17 +384,16 @@ type writer struct { sessionURI string buffer []byte buffSize int + gcs *storage.Client } // Cancel removes any written content from this FileWriter. func (w *writer) Cancel(ctx context.Context) error { w.closed = true - err := storageDeleteObject(cloud.NewContext(dummyProjectID, w.client), w.bucket, w.name) + err := storageDeleteObject(ctx, w.bucket, w.name, w.gcs) if err != nil { - if status, ok := err.(*googleapi.Error); ok { - if status.Code == http.StatusNotFound { - err = nil - } + if err == storage.ErrObjectNotExist { + err = nil } } return err @@ -405,8 +419,9 @@ func (w *writer) Close() error { } // commit the writes by updating the upload session + ctx := context.TODO() err = retry(func() error { - wc := storage.NewWriter(cloud.NewContext(dummyProjectID, w.client), w.bucket, w.name) + wc := w.gcs.Bucket(w.bucket).Object(w.name).NewWriter(ctx) wc.ContentType = uploadSessionContentType wc.Metadata = map[string]string{ "Session-URI": w.sessionURI, @@ -449,11 +464,12 @@ func (w *writer) Commit() error { return err } w.closed = true + ctx := context.TODO() // no session started yet just perform a simple upload if w.sessionURI == "" { err := retry(func() error { - wc := storage.NewWriter(cloud.NewContext(dummyProjectID, w.client), w.bucket, w.name) + wc := w.gcs.Bucket(w.bucket).Object(w.name).NewWriter(ctx) wc.ContentType = "application/octet-stream" return putContentsClose(wc, w.buffer[0:w.buffSize]) }) @@ -536,6 +552,7 @@ func (w *writer) Write(p []byte) (int, error) { } nn += n } + w.size = w.offset + int64(w.buffSize) return nn, err } @@ -594,11 +611,10 @@ func retry(req request) error { // Stat retrieves the FileInfo for the given path, including the current // size in bytes and the creation time. -func (d *driver) Stat(context context.Context, path string) (storagedriver.FileInfo, error) { +func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) { var fi storagedriver.FileInfoFields // try to get as file - gcsContext := d.context(context) - obj, err := storageStatObject(gcsContext, d.bucket, d.pathToKey(path)) + obj, err := d.storageStatObject(ctx, path) if err == nil { if obj.ContentType == uploadSessionContentType { return nil, storagedriver.PathNotFoundError{Path: path} @@ -617,20 +633,19 @@ func (d *driver) Stat(context context.Context, path string) (storagedriver.FileI var query *storage.Query query = &storage.Query{} query.Prefix = dirpath - query.MaxResults = 1 - objects, err := storageListObjects(gcsContext, d.bucket, query) + objects, err := storageListObjects(ctx, d.bucket, query, d.gcs) if err != nil { return nil, err } - if len(objects.Results) < 1 { + if len(objects) < 1 { return nil, storagedriver.PathNotFoundError{Path: path} } fi = storagedriver.FileInfoFields{ Path: path, IsDir: true, } - obj = objects.Results[0] + obj = objects[0] if obj.Name == dirpath { fi.Size = obj.Size fi.ModTime = obj.Updated @@ -640,34 +655,30 @@ func (d *driver) Stat(context context.Context, path string) (storagedriver.FileI // List returns a list of the objects that are direct descendants of the // given path. -func (d *driver) List(context context.Context, path string) ([]string, error) { +func (d *driver) List(ctx context.Context, path string) ([]string, error) { var query *storage.Query query = &storage.Query{} query.Delimiter = "/" query.Prefix = d.pathToDirKey(path) list := make([]string, 0, 64) - for { - objects, err := storageListObjects(d.context(context), d.bucket, query) - if err != nil { - return nil, err + objects, err := storageListObjects(ctx, d.bucket, query, d.gcs) + if err != nil { + return nil, err + } + for _, object := range objects { + // GCS does not guarantee strong consistency between + // DELETE and LIST operations. Check that the object is not deleted, + // and filter out any objects with a non-zero time-deleted + if object.Deleted.IsZero() && object.ContentType != uploadSessionContentType && object.Name != "" { + list = append(list, d.keyToPath(object.Name)) } - for _, object := range objects.Results { - // GCS does not guarantee strong consistency between - // DELETE and LIST operations. Check that the object is not deleted, - // and filter out any objects with a non-zero time-deleted - if object.Deleted.IsZero() && object.ContentType != uploadSessionContentType { - list = append(list, d.keyToPath(object.Name)) - } - } - for _, subpath := range objects.Prefixes { - subpath = d.keyToPath(subpath) + + if object.Name == "" && object.Prefix != "" { + subpath := d.keyToPath(object.Prefix) list = append(list, subpath) } - query = objects.Next - if query == nil { - break - } } + if path != "/" && len(list) == 0 { // Treat empty response as missing directory, since we don't actually // have directories in Google Cloud Storage. @@ -678,9 +689,8 @@ func (d *driver) List(context context.Context, path string) ([]string, error) { // Move moves an object stored at sourcePath to destPath, removing the // original object. -func (d *driver) Move(context context.Context, sourcePath string, destPath string) error { - gcsContext := d.context(context) - _, err := storageCopyObject(gcsContext, d.bucket, d.pathToKey(sourcePath), d.bucket, d.pathToKey(destPath), nil) +func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error { + _, err := storageCopyObject(ctx, d.bucket, d.pathToKey(sourcePath), d.bucket, d.pathToKey(destPath), nil, d.gcs) if err != nil { if status, ok := err.(*googleapi.Error); ok { if status.Code == http.StatusNotFound { @@ -689,7 +699,7 @@ func (d *driver) Move(context context.Context, sourcePath string, destPath strin } return err } - err = storageDeleteObject(gcsContext, d.bucket, d.pathToKey(sourcePath)) + err = storageDeleteObject(ctx, d.bucket, d.pathToKey(sourcePath), d.gcs) // if deleting the file fails, log the error, but do not fail; the file was successfully copied, // and the original should eventually be cleaned when purging the uploads folder. if err != nil { @@ -699,44 +709,37 @@ func (d *driver) Move(context context.Context, sourcePath string, destPath strin } // listAll recursively lists all names of objects stored at "prefix" and its subpaths. -func (d *driver) listAll(context context.Context, prefix string) ([]string, error) { +func (d *driver) listAll(ctx context.Context, prefix string) ([]string, error) { list := make([]string, 0, 64) query := &storage.Query{} query.Prefix = prefix query.Versions = false - for { - objects, err := storageListObjects(d.context(context), d.bucket, query) - if err != nil { - return nil, err - } - for _, obj := range objects.Results { - // GCS does not guarantee strong consistency between - // DELETE and LIST operations. Check that the object is not deleted, - // and filter out any objects with a non-zero time-deleted - if obj.Deleted.IsZero() { - list = append(list, obj.Name) - } - } - query = objects.Next - if query == nil { - break + objects, err := storageListObjects(ctx, d.bucket, query, d.gcs) + if err != nil { + return nil, err + } + for _, obj := range objects { + // GCS does not guarantee strong consistency between + // DELETE and LIST operations. Check that the object is not deleted, + // and filter out any objects with a non-zero time-deleted + if obj.Deleted.IsZero() { + list = append(list, obj.Name) } } return list, nil } // Delete recursively deletes all objects stored at "path" and its subpaths. -func (d *driver) Delete(context context.Context, path string) error { +func (d *driver) Delete(ctx context.Context, path string) error { prefix := d.pathToDirKey(path) - gcsContext := d.context(context) - keys, err := d.listAll(gcsContext, prefix) + keys, err := d.listAll(ctx, prefix) if err != nil { return err } if len(keys) > 0 { sort.Sort(sort.Reverse(sort.StringSlice(keys))) for _, key := range keys { - err := storageDeleteObject(gcsContext, d.bucket, key) + err := storageDeleteObject(ctx, d.bucket, key, d.gcs) // GCS only guarantees eventual consistency, so listAll might return // paths that no longer exist. If this happens, just ignore any not // found error @@ -751,57 +754,66 @@ func (d *driver) Delete(context context.Context, path string) error { } return nil } - err = storageDeleteObject(gcsContext, d.bucket, d.pathToKey(path)) - if err != nil { - if status, ok := err.(*googleapi.Error); ok { - if status.Code == http.StatusNotFound { - return storagedriver.PathNotFoundError{Path: path} - } - } + err = storageDeleteObject(ctx, d.bucket, d.pathToKey(path), d.gcs) + if err == storage.ErrObjectNotExist { + return storagedriver.PathNotFoundError{Path: path} } return err } -func storageDeleteObject(context context.Context, bucket string, name string) error { - return retry(func() error { - return storage.DeleteObject(context, bucket, name) - }) +func storageDeleteObject(ctx context.Context, bucket string, name string, gcs *storage.Client) error { + return gcs.Bucket(bucket).Object(name).Delete(ctx) } -func storageStatObject(context context.Context, bucket string, name string) (*storage.Object, error) { - var obj *storage.Object +func (d *driver) storageStatObject(ctx context.Context, name string) (*storage.ObjectAttrs, error) { + bkt := d.gcs.Bucket(d.bucket) + var obj *storage.ObjectAttrs err := retry(func() error { var err error - obj, err = storage.StatObject(context, bucket, name) + obj, err = bkt.Object(d.pathToKey(name)).Attrs(ctx) return err }) return obj, err } -func storageListObjects(context context.Context, bucket string, q *storage.Query) (*storage.Objects, error) { - var objs *storage.Objects - err := retry(func() error { - var err error - objs, err = storage.ListObjects(context, bucket, q) - return err - }) - return objs, err +func storageListObjects(ctx context.Context, bucket string, q *storage.Query, gcs *storage.Client) ([]*storage.ObjectAttrs, error) { + bkt := gcs.Bucket(bucket) + var objs []*storage.ObjectAttrs + it := bkt.Objects(ctx, q) + for { + objAttrs, err := it.Next() + if err == iterator.Done { + break + } + if err != nil { + return nil, err + } + objs = append(objs, objAttrs) + } + + return objs, nil } -func storageCopyObject(context context.Context, srcBucket, srcName string, destBucket, destName string, attrs *storage.ObjectAttrs) (*storage.Object, error) { - var obj *storage.Object - err := retry(func() error { - var err error - obj, err = storage.CopyObject(context, srcBucket, srcName, destBucket, destName, attrs) - return err - }) - return obj, err +func storageCopyObject(ctx context.Context, srcBucket, srcName string, destBucket, destName string, attrs *storage.ObjectAttrs, gcs *storage.Client) (*storage.ObjectAttrs, error) { + src := gcs.Bucket(srcBucket).Object(srcName) + dst := gcs.Bucket(destBucket).Object(destName) + attrs, err := dst.CopierFrom(src).Run(ctx) + if err != nil { + var status *googleapi.Error + if errors.As(err, &status) { + if status.Code == http.StatusNotFound { + return nil, storagedriver.PathNotFoundError{Path: srcName} + } + } + return nil, fmt.Errorf("Object(%q).CopierFrom(%q).Run: %w", destName, srcName, err) + } + return attrs, err } // URLFor returns a URL which may be used to retrieve the content stored at // the given path, possibly using the given options. // Returns ErrUnsupportedMethod if this driver has no privateKey -func (d *driver) URLFor(context context.Context, path string, options map[string]interface{}) (string, error) { +func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) { if d.privateKey == nil { return "", storagedriver.ErrUnsupportedMethod{} } @@ -914,10 +926,6 @@ func putChunk(client *http.Client, sessionURI string, chunk []byte, from int64, return bytesPut, err } -func (d *driver) context(context context.Context) context.Context { - return cloud.WithContext(context, dummyProjectID, d.client) -} - func (d *driver) pathToKey(path string) string { return strings.TrimSpace(strings.TrimRight(d.rootDirectory+strings.TrimLeft(path, "/"), "/")) } diff --git a/registry/storage/driver/gcs/gcs_test.go b/registry/storage/driver/gcs/gcs_test.go index c4271cbd..74c590f3 100644 --- a/registry/storage/driver/gcs/gcs_test.go +++ b/registry/storage/driver/gcs/gcs_test.go @@ -8,13 +8,14 @@ import ( "os" "testing" + "cloud.google.com/go/storage" dcontext "github.com/distribution/distribution/v3/context" storagedriver "github.com/distribution/distribution/v3/registry/storage/driver" "github.com/distribution/distribution/v3/registry/storage/driver/testsuites" "golang.org/x/oauth2" "golang.org/x/oauth2/google" "google.golang.org/api/googleapi" - "google.golang.org/cloud/storage" + "google.golang.org/api/option" "gopkg.in/check.v1" ) @@ -42,6 +43,11 @@ func init() { return } + jsonKey, err := os.ReadFile(credentials) + if err != nil { + panic(fmt.Sprintf("Error reading JSON key : %v", err)) + } + root, err := os.MkdirTemp("", "driver-") if err != nil { panic(err) @@ -55,7 +61,7 @@ func init() { if err != nil { // Assume that the file contents are within the environment variable since it exists // but does not contain a valid file path - jwtConfig, err := google.JWTConfigFromJSON([]byte(credentials), storage.ScopeFullControl) + jwtConfig, err := google.JWTConfigFromJSON(jsonKey, storage.ScopeFullControl) if err != nil { panic(fmt.Sprintf("Error reading JWT config : %s", err)) } @@ -70,14 +76,21 @@ func init() { ts = jwtConfig.TokenSource(dcontext.Background()) } + gcs, err := storage.NewClient(dcontext.Background(), option.WithCredentialsJSON(jsonKey)) + if err != nil { + panic(fmt.Sprintf("Error initializing gcs client : %v", err)) + } + gcsDriverConstructor = func(rootDirectory string) (storagedriver.StorageDriver, error) { parameters := driverParameters{ - bucket: bucket, - rootDirectory: root, - email: email, - privateKey: privateKey, - client: oauth2.NewClient(dcontext.Background(), ts), - chunkSize: defaultChunkSize, + bucket: bucket, + rootDirectory: root, + email: email, + privateKey: privateKey, + client: oauth2.NewClient(dcontext.Background(), ts), + chunkSize: defaultChunkSize, + gcs: gcs, + maxConcurrency: 8, } return New(parameters)