registry/storage/driver/gcs: fix code to use updated gcs driver

Signed-off-by: Flavian Missi <fmissi@redhat.com>
This commit is contained in:
Flavian Missi 2023-05-23 10:42:39 +02:00
parent d0bc83d8e4
commit 0207adaa5c
2 changed files with 146 additions and 125 deletions

View file

@ -19,6 +19,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"math/rand"
@ -32,6 +33,7 @@ import (
"strings"
"time"
"cloud.google.com/go/storage"
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/distribution/distribution/v3/registry/storage/driver/base"
"github.com/distribution/distribution/v3/registry/storage/driver/factory"
@ -40,8 +42,8 @@ import (
"golang.org/x/oauth2/google"
"golang.org/x/oauth2/jwt"
"google.golang.org/api/googleapi"
"google.golang.org/cloud"
"google.golang.org/cloud/storage"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
)
const (
@ -59,6 +61,8 @@ const (
var rangeHeader = regexp.MustCompile(`^bytes=([0-9])+-([0-9]+)$`)
var _ storagedriver.FileWriter = &writer{}
// driverParameters is a struct that encapsulates all of the driver parameters after all values have been set
type driverParameters struct {
bucket string
@ -68,6 +72,7 @@ type driverParameters struct {
client *http.Client
rootDirectory string
chunkSize int
gcs *storage.Client
// maxConcurrency limits the number of concurrent driver operations
// to GCS, which ultimately increases reliability of many simultaneous
@ -97,6 +102,7 @@ type driver struct {
privateKey []byte
rootDirectory string
chunkSize int
gcs *storage.Client
}
// Wrapper wraps `driver` with a throttler, ensuring that no more than N
@ -113,6 +119,7 @@ type baseEmbed struct {
// Required parameters:
// - bucket
func FromParameters(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
ctx := context.TODO()
bucket, ok := parameters["bucket"]
if !ok || fmt.Sprint(bucket) == "" {
return nil, fmt.Errorf("No bucket parameter provided")
@ -150,6 +157,8 @@ func FromParameters(parameters map[string]interface{}) (storagedriver.StorageDri
var ts oauth2.TokenSource
jwtConf := new(jwt.Config)
var err error
var gcs *storage.Client
if keyfile, ok := parameters["keyfile"]; ok {
jsonKey, err := os.ReadFile(fmt.Sprint(keyfile))
if err != nil {
@ -159,7 +168,11 @@ func FromParameters(parameters map[string]interface{}) (storagedriver.StorageDri
if err != nil {
return nil, err
}
ts = jwtConf.TokenSource(context.Background())
ts = jwtConf.TokenSource(ctx)
gcs, err = storage.NewClient(ctx, option.WithCredentialsFile(fmt.Sprint(keyfile)))
if err != nil {
return nil, err
}
} else if credentials, ok := parameters["credentials"]; ok {
credentialMap, ok := credentials.(map[interface{}]interface{})
if !ok {
@ -184,10 +197,14 @@ func FromParameters(parameters map[string]interface{}) (storagedriver.StorageDri
if err != nil {
return nil, err
}
ts = jwtConf.TokenSource(context.Background())
ts = jwtConf.TokenSource(ctx)
gcs, err = storage.NewClient(ctx, option.WithCredentialsJSON(data))
if err != nil {
return nil, err
}
} else {
var err error
ts, err = google.DefaultTokenSource(context.Background(), storage.ScopeFullControl)
ts, err = google.DefaultTokenSource(ctx, storage.ScopeFullControl)
if err != nil {
return nil, err
}
@ -198,14 +215,18 @@ func FromParameters(parameters map[string]interface{}) (storagedriver.StorageDri
return nil, fmt.Errorf("maxconcurrency config error: %s", err)
}
if gcs == nil {
panic("gcs client was nil")
}
params := driverParameters{
bucket: fmt.Sprint(bucket),
rootDirectory: fmt.Sprint(rootDirectory),
email: jwtConf.Email,
privateKey: jwtConf.PrivateKey,
client: oauth2.NewClient(context.Background(), ts),
client: oauth2.NewClient(ctx, ts),
chunkSize: chunkSize,
maxConcurrency: maxConcurrency,
gcs: gcs,
}
return New(params)
@ -227,6 +248,7 @@ func New(params driverParameters) (storagedriver.StorageDriver, error) {
privateKey: params.privateKey,
client: params.client,
chunkSize: params.chunkSize,
gcs: params.gcs,
}
return &Wrapper{
@ -246,15 +268,9 @@ func (d *driver) Name() string {
// GetContent retrieves the content stored at "path" as a []byte.
// This should primarily be used for small objects.
func (d *driver) GetContent(context context.Context, path string) ([]byte, error) {
gcsContext := d.context(context)
func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
name := d.pathToKey(path)
var rc io.ReadCloser
err := retry(func() error {
var err error
rc, err = storage.NewReader(gcsContext, d.bucket, name)
return err
})
rc, err := d.gcs.Bucket(d.bucket).Object(name).NewReader(ctx)
if err == storage.ErrObjectNotExist {
return nil, storagedriver.PathNotFoundError{Path: path}
}
@ -272,18 +288,16 @@ func (d *driver) GetContent(context context.Context, path string) ([]byte, error
// PutContent stores the []byte content at a location designated by "path".
// This should primarily be used for small objects.
func (d *driver) PutContent(context context.Context, path string, contents []byte) error {
return retry(func() error {
wc := storage.NewWriter(d.context(context), d.bucket, d.pathToKey(path))
wc.ContentType = "application/octet-stream"
return putContentsClose(wc, contents)
})
func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
wc := d.gcs.Bucket(d.bucket).Object(d.pathToKey(path)).NewWriter(ctx)
wc.ContentType = "application/octet-stream"
return putContentsClose(wc, contents)
}
// Reader retrieves an io.ReadCloser for the content stored at "path"
// with a given byte offset.
// May be used to resume reading a stream by providing a nonzero offset.
func (d *driver) Reader(context context.Context, path string, offset int64) (io.ReadCloser, error) {
func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
res, err := getObject(d.client, d.bucket, d.pathToKey(path), offset)
if err != nil {
if res != nil {
@ -294,7 +308,7 @@ func (d *driver) Reader(context context.Context, path string, offset int64) (io.
if res.StatusCode == http.StatusRequestedRangeNotSatisfiable {
res.Body.Close()
obj, err := storageStatObject(d.context(context), d.bucket, d.pathToKey(path))
obj, err := d.storageStatObject(ctx, path)
if err != nil {
return nil, err
}
@ -314,7 +328,7 @@ func (d *driver) Reader(context context.Context, path string, offset int64) (io.
}
func getObject(client *http.Client, bucket string, name string, offset int64) (*http.Response, error) {
// copied from google.golang.org/cloud/storage#NewReader :
// copied from cloud.google.com/go/storage#NewReader :
// to set the additional "Range" header
u := &url.URL{
Scheme: "https",
@ -342,12 +356,13 @@ func getObject(client *http.Client, bucket string, name string, offset int64) (*
// Writer returns a FileWriter which will store the content written to it
// at the location designated by "path" after the call to Commit.
func (d *driver) Writer(context context.Context, path string, append bool) (storagedriver.FileWriter, error) {
func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
writer := &writer{
client: d.client,
bucket: d.bucket,
name: d.pathToKey(path),
buffer: make([]byte, d.chunkSize),
gcs: d.gcs,
}
if append {
@ -369,17 +384,16 @@ type writer struct {
sessionURI string
buffer []byte
buffSize int
gcs *storage.Client
}
// Cancel removes any written content from this FileWriter.
func (w *writer) Cancel(ctx context.Context) error {
w.closed = true
err := storageDeleteObject(cloud.NewContext(dummyProjectID, w.client), w.bucket, w.name)
err := storageDeleteObject(ctx, w.bucket, w.name, w.gcs)
if err != nil {
if status, ok := err.(*googleapi.Error); ok {
if status.Code == http.StatusNotFound {
err = nil
}
if err == storage.ErrObjectNotExist {
err = nil
}
}
return err
@ -405,8 +419,9 @@ func (w *writer) Close() error {
}
// commit the writes by updating the upload session
ctx := context.TODO()
err = retry(func() error {
wc := storage.NewWriter(cloud.NewContext(dummyProjectID, w.client), w.bucket, w.name)
wc := w.gcs.Bucket(w.bucket).Object(w.name).NewWriter(ctx)
wc.ContentType = uploadSessionContentType
wc.Metadata = map[string]string{
"Session-URI": w.sessionURI,
@ -449,11 +464,12 @@ func (w *writer) Commit() error {
return err
}
w.closed = true
ctx := context.TODO()
// no session started yet just perform a simple upload
if w.sessionURI == "" {
err := retry(func() error {
wc := storage.NewWriter(cloud.NewContext(dummyProjectID, w.client), w.bucket, w.name)
wc := w.gcs.Bucket(w.bucket).Object(w.name).NewWriter(ctx)
wc.ContentType = "application/octet-stream"
return putContentsClose(wc, w.buffer[0:w.buffSize])
})
@ -536,6 +552,7 @@ func (w *writer) Write(p []byte) (int, error) {
}
nn += n
}
w.size = w.offset + int64(w.buffSize)
return nn, err
}
@ -594,11 +611,10 @@ func retry(req request) error {
// Stat retrieves the FileInfo for the given path, including the current
// size in bytes and the creation time.
func (d *driver) Stat(context context.Context, path string) (storagedriver.FileInfo, error) {
func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
var fi storagedriver.FileInfoFields
// try to get as file
gcsContext := d.context(context)
obj, err := storageStatObject(gcsContext, d.bucket, d.pathToKey(path))
obj, err := d.storageStatObject(ctx, path)
if err == nil {
if obj.ContentType == uploadSessionContentType {
return nil, storagedriver.PathNotFoundError{Path: path}
@ -617,20 +633,19 @@ func (d *driver) Stat(context context.Context, path string) (storagedriver.FileI
var query *storage.Query
query = &storage.Query{}
query.Prefix = dirpath
query.MaxResults = 1
objects, err := storageListObjects(gcsContext, d.bucket, query)
objects, err := storageListObjects(ctx, d.bucket, query, d.gcs)
if err != nil {
return nil, err
}
if len(objects.Results) < 1 {
if len(objects) < 1 {
return nil, storagedriver.PathNotFoundError{Path: path}
}
fi = storagedriver.FileInfoFields{
Path: path,
IsDir: true,
}
obj = objects.Results[0]
obj = objects[0]
if obj.Name == dirpath {
fi.Size = obj.Size
fi.ModTime = obj.Updated
@ -640,34 +655,30 @@ func (d *driver) Stat(context context.Context, path string) (storagedriver.FileI
// List returns a list of the objects that are direct descendants of the
// given path.
func (d *driver) List(context context.Context, path string) ([]string, error) {
func (d *driver) List(ctx context.Context, path string) ([]string, error) {
var query *storage.Query
query = &storage.Query{}
query.Delimiter = "/"
query.Prefix = d.pathToDirKey(path)
list := make([]string, 0, 64)
for {
objects, err := storageListObjects(d.context(context), d.bucket, query)
if err != nil {
return nil, err
objects, err := storageListObjects(ctx, d.bucket, query, d.gcs)
if err != nil {
return nil, err
}
for _, object := range objects {
// GCS does not guarantee strong consistency between
// DELETE and LIST operations. Check that the object is not deleted,
// and filter out any objects with a non-zero time-deleted
if object.Deleted.IsZero() && object.ContentType != uploadSessionContentType && object.Name != "" {
list = append(list, d.keyToPath(object.Name))
}
for _, object := range objects.Results {
// GCS does not guarantee strong consistency between
// DELETE and LIST operations. Check that the object is not deleted,
// and filter out any objects with a non-zero time-deleted
if object.Deleted.IsZero() && object.ContentType != uploadSessionContentType {
list = append(list, d.keyToPath(object.Name))
}
}
for _, subpath := range objects.Prefixes {
subpath = d.keyToPath(subpath)
if object.Name == "" && object.Prefix != "" {
subpath := d.keyToPath(object.Prefix)
list = append(list, subpath)
}
query = objects.Next
if query == nil {
break
}
}
if path != "/" && len(list) == 0 {
// Treat empty response as missing directory, since we don't actually
// have directories in Google Cloud Storage.
@ -678,9 +689,8 @@ func (d *driver) List(context context.Context, path string) ([]string, error) {
// Move moves an object stored at sourcePath to destPath, removing the
// original object.
func (d *driver) Move(context context.Context, sourcePath string, destPath string) error {
gcsContext := d.context(context)
_, err := storageCopyObject(gcsContext, d.bucket, d.pathToKey(sourcePath), d.bucket, d.pathToKey(destPath), nil)
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
_, err := storageCopyObject(ctx, d.bucket, d.pathToKey(sourcePath), d.bucket, d.pathToKey(destPath), nil, d.gcs)
if err != nil {
if status, ok := err.(*googleapi.Error); ok {
if status.Code == http.StatusNotFound {
@ -689,7 +699,7 @@ func (d *driver) Move(context context.Context, sourcePath string, destPath strin
}
return err
}
err = storageDeleteObject(gcsContext, d.bucket, d.pathToKey(sourcePath))
err = storageDeleteObject(ctx, d.bucket, d.pathToKey(sourcePath), d.gcs)
// if deleting the file fails, log the error, but do not fail; the file was successfully copied,
// and the original should eventually be cleaned when purging the uploads folder.
if err != nil {
@ -699,44 +709,37 @@ func (d *driver) Move(context context.Context, sourcePath string, destPath strin
}
// listAll recursively lists all names of objects stored at "prefix" and its subpaths.
func (d *driver) listAll(context context.Context, prefix string) ([]string, error) {
func (d *driver) listAll(ctx context.Context, prefix string) ([]string, error) {
list := make([]string, 0, 64)
query := &storage.Query{}
query.Prefix = prefix
query.Versions = false
for {
objects, err := storageListObjects(d.context(context), d.bucket, query)
if err != nil {
return nil, err
}
for _, obj := range objects.Results {
// GCS does not guarantee strong consistency between
// DELETE and LIST operations. Check that the object is not deleted,
// and filter out any objects with a non-zero time-deleted
if obj.Deleted.IsZero() {
list = append(list, obj.Name)
}
}
query = objects.Next
if query == nil {
break
objects, err := storageListObjects(ctx, d.bucket, query, d.gcs)
if err != nil {
return nil, err
}
for _, obj := range objects {
// GCS does not guarantee strong consistency between
// DELETE and LIST operations. Check that the object is not deleted,
// and filter out any objects with a non-zero time-deleted
if obj.Deleted.IsZero() {
list = append(list, obj.Name)
}
}
return list, nil
}
// Delete recursively deletes all objects stored at "path" and its subpaths.
func (d *driver) Delete(context context.Context, path string) error {
func (d *driver) Delete(ctx context.Context, path string) error {
prefix := d.pathToDirKey(path)
gcsContext := d.context(context)
keys, err := d.listAll(gcsContext, prefix)
keys, err := d.listAll(ctx, prefix)
if err != nil {
return err
}
if len(keys) > 0 {
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
for _, key := range keys {
err := storageDeleteObject(gcsContext, d.bucket, key)
err := storageDeleteObject(ctx, d.bucket, key, d.gcs)
// GCS only guarantees eventual consistency, so listAll might return
// paths that no longer exist. If this happens, just ignore any not
// found error
@ -751,57 +754,66 @@ func (d *driver) Delete(context context.Context, path string) error {
}
return nil
}
err = storageDeleteObject(gcsContext, d.bucket, d.pathToKey(path))
if err != nil {
if status, ok := err.(*googleapi.Error); ok {
if status.Code == http.StatusNotFound {
return storagedriver.PathNotFoundError{Path: path}
}
}
err = storageDeleteObject(ctx, d.bucket, d.pathToKey(path), d.gcs)
if err == storage.ErrObjectNotExist {
return storagedriver.PathNotFoundError{Path: path}
}
return err
}
func storageDeleteObject(context context.Context, bucket string, name string) error {
return retry(func() error {
return storage.DeleteObject(context, bucket, name)
})
func storageDeleteObject(ctx context.Context, bucket string, name string, gcs *storage.Client) error {
return gcs.Bucket(bucket).Object(name).Delete(ctx)
}
func storageStatObject(context context.Context, bucket string, name string) (*storage.Object, error) {
var obj *storage.Object
func (d *driver) storageStatObject(ctx context.Context, name string) (*storage.ObjectAttrs, error) {
bkt := d.gcs.Bucket(d.bucket)
var obj *storage.ObjectAttrs
err := retry(func() error {
var err error
obj, err = storage.StatObject(context, bucket, name)
obj, err = bkt.Object(d.pathToKey(name)).Attrs(ctx)
return err
})
return obj, err
}
func storageListObjects(context context.Context, bucket string, q *storage.Query) (*storage.Objects, error) {
var objs *storage.Objects
err := retry(func() error {
var err error
objs, err = storage.ListObjects(context, bucket, q)
return err
})
return objs, err
func storageListObjects(ctx context.Context, bucket string, q *storage.Query, gcs *storage.Client) ([]*storage.ObjectAttrs, error) {
bkt := gcs.Bucket(bucket)
var objs []*storage.ObjectAttrs
it := bkt.Objects(ctx, q)
for {
objAttrs, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, err
}
objs = append(objs, objAttrs)
}
return objs, nil
}
func storageCopyObject(context context.Context, srcBucket, srcName string, destBucket, destName string, attrs *storage.ObjectAttrs) (*storage.Object, error) {
var obj *storage.Object
err := retry(func() error {
var err error
obj, err = storage.CopyObject(context, srcBucket, srcName, destBucket, destName, attrs)
return err
})
return obj, err
func storageCopyObject(ctx context.Context, srcBucket, srcName string, destBucket, destName string, attrs *storage.ObjectAttrs, gcs *storage.Client) (*storage.ObjectAttrs, error) {
src := gcs.Bucket(srcBucket).Object(srcName)
dst := gcs.Bucket(destBucket).Object(destName)
attrs, err := dst.CopierFrom(src).Run(ctx)
if err != nil {
var status *googleapi.Error
if errors.As(err, &status) {
if status.Code == http.StatusNotFound {
return nil, storagedriver.PathNotFoundError{Path: srcName}
}
}
return nil, fmt.Errorf("Object(%q).CopierFrom(%q).Run: %w", destName, srcName, err)
}
return attrs, err
}
// URLFor returns a URL which may be used to retrieve the content stored at
// the given path, possibly using the given options.
// Returns ErrUnsupportedMethod if this driver has no privateKey
func (d *driver) URLFor(context context.Context, path string, options map[string]interface{}) (string, error) {
func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
if d.privateKey == nil {
return "", storagedriver.ErrUnsupportedMethod{}
}
@ -914,10 +926,6 @@ func putChunk(client *http.Client, sessionURI string, chunk []byte, from int64,
return bytesPut, err
}
func (d *driver) context(context context.Context) context.Context {
return cloud.WithContext(context, dummyProjectID, d.client)
}
func (d *driver) pathToKey(path string) string {
return strings.TrimSpace(strings.TrimRight(d.rootDirectory+strings.TrimLeft(path, "/"), "/"))
}

View file

@ -8,13 +8,14 @@ import (
"os"
"testing"
"cloud.google.com/go/storage"
dcontext "github.com/distribution/distribution/v3/context"
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/distribution/distribution/v3/registry/storage/driver/testsuites"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/api/googleapi"
"google.golang.org/cloud/storage"
"google.golang.org/api/option"
"gopkg.in/check.v1"
)
@ -42,6 +43,11 @@ func init() {
return
}
jsonKey, err := os.ReadFile(credentials)
if err != nil {
panic(fmt.Sprintf("Error reading JSON key : %v", err))
}
root, err := os.MkdirTemp("", "driver-")
if err != nil {
panic(err)
@ -55,7 +61,7 @@ func init() {
if err != nil {
// Assume that the file contents are within the environment variable since it exists
// but does not contain a valid file path
jwtConfig, err := google.JWTConfigFromJSON([]byte(credentials), storage.ScopeFullControl)
jwtConfig, err := google.JWTConfigFromJSON(jsonKey, storage.ScopeFullControl)
if err != nil {
panic(fmt.Sprintf("Error reading JWT config : %s", err))
}
@ -70,14 +76,21 @@ func init() {
ts = jwtConfig.TokenSource(dcontext.Background())
}
gcs, err := storage.NewClient(dcontext.Background(), option.WithCredentialsJSON(jsonKey))
if err != nil {
panic(fmt.Sprintf("Error initializing gcs client : %v", err))
}
gcsDriverConstructor = func(rootDirectory string) (storagedriver.StorageDriver, error) {
parameters := driverParameters{
bucket: bucket,
rootDirectory: root,
email: email,
privateKey: privateKey,
client: oauth2.NewClient(dcontext.Background(), ts),
chunkSize: defaultChunkSize,
bucket: bucket,
rootDirectory: root,
email: email,
privateKey: privateKey,
client: oauth2.NewClient(dcontext.Background(), ts),
chunkSize: defaultChunkSize,
gcs: gcs,
maxConcurrency: 8,
}
return New(parameters)