refactor: gcs storage driver (#4120)

This commit is contained in:
Milos Gajdos 2023-11-17 13:06:07 +00:00 committed by GitHub
commit 9610a1e618
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -27,7 +27,6 @@ import (
"os" "os"
"reflect" "reflect"
"regexp" "regexp"
"sort"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -49,12 +48,14 @@ const (
driverName = "gcs" driverName = "gcs"
dummyProjectID = "<unknown>" dummyProjectID = "<unknown>"
uploadSessionContentType = "application/x-docker-upload-session"
minChunkSize = 256 * 1024 minChunkSize = 256 * 1024
defaultChunkSize = 20 * minChunkSize defaultChunkSize = 16 * 1024 * 1024
defaultMaxConcurrency = 50 defaultMaxConcurrency = 50
minConcurrency = 25 minConcurrency = 25
uploadSessionContentType = "application/x-docker-upload-session"
blobContentType = "application/octet-stream"
maxTries = 5 maxTries = 5
) )
@ -97,12 +98,11 @@ var _ storagedriver.StorageDriver = &driver{}
// Objects are stored at absolute keys in the provided bucket. // Objects are stored at absolute keys in the provided bucket.
type driver struct { type driver struct {
client *http.Client client *http.Client
bucket string bucket *storage.BucketHandle
email string email string
privateKey []byte privateKey []byte
rootDirectory string rootDirectory string
chunkSize int chunkSize int
gcs *storage.Client
} }
// Wrapper wraps `driver` with a throttler, ensuring that no more than N // Wrapper wraps `driver` with a throttler, ensuring that no more than N
@ -136,7 +136,7 @@ func FromParameters(ctx context.Context, parameters map[string]interface{}) (sto
case string: case string:
vv, err := strconv.Atoi(v) vv, err := strconv.Atoi(v)
if err != nil { if err != nil {
return nil, fmt.Errorf("chunksize parameter must be an integer, %v invalid", chunkSizeParam) return nil, fmt.Errorf("chunksize must be an integer, %v invalid", chunkSizeParam)
} }
chunkSize = vv chunkSize = vv
case int, uint, int32, uint32, uint64, int64: case int, uint, int32, uint32, uint64, int64:
@ -146,7 +146,7 @@ func FromParameters(ctx context.Context, parameters map[string]interface{}) (sto
} }
if chunkSize < minChunkSize { if chunkSize < minChunkSize {
return nil, fmt.Errorf("The chunksize %#v parameter should be a number that is larger than or equal to %d", chunkSize, minChunkSize) return nil, fmt.Errorf("chunksize %#v must be larger than or equal to %d", chunkSize, minChunkSize)
} }
if chunkSize%minChunkSize != 0 { if chunkSize%minChunkSize != 0 {
@ -203,10 +203,17 @@ func FromParameters(ctx context.Context, parameters map[string]interface{}) (sto
} }
} else { } else {
var err error var err error
// DefaultTokenSource is a convenience method. It first calls FindDefaultCredentials,
// then uses the credentials to construct an http.Client or an oauth2.TokenSource.
// https://pkg.go.dev/golang.org/x/oauth2/google#hdr-Credentials
ts, err = google.DefaultTokenSource(ctx, storage.ScopeFullControl) ts, err = google.DefaultTokenSource(ctx, storage.ScopeFullControl)
if err != nil { if err != nil {
return nil, err return nil, err
} }
gcs, err = storage.NewClient(ctx)
if err != nil {
return nil, err
}
} }
maxConcurrency, err := base.GetLimitFromParameter(parameters["maxconcurrency"], minConcurrency, defaultMaxConcurrency) maxConcurrency, err := base.GetLimitFromParameter(parameters["maxconcurrency"], minConcurrency, defaultMaxConcurrency)
@ -214,9 +221,6 @@ func FromParameters(ctx context.Context, parameters map[string]interface{}) (sto
return nil, fmt.Errorf("maxconcurrency config error: %s", err) return nil, fmt.Errorf("maxconcurrency config error: %s", err)
} }
if gcs == nil {
panic("gcs client was nil")
}
params := driverParameters{ params := driverParameters{
bucket: fmt.Sprint(bucket), bucket: fmt.Sprint(bucket),
rootDirectory: fmt.Sprint(rootDirectory), rootDirectory: fmt.Sprint(rootDirectory),
@ -241,13 +245,12 @@ func New(ctx context.Context, params driverParameters) (storagedriver.StorageDri
return nil, fmt.Errorf("Invalid chunksize: %d is not a positive multiple of %d", params.chunkSize, minChunkSize) return nil, fmt.Errorf("Invalid chunksize: %d is not a positive multiple of %d", params.chunkSize, minChunkSize)
} }
d := &driver{ d := &driver{
bucket: params.bucket, bucket: params.gcs.Bucket(params.bucket),
rootDirectory: rootDirectory, rootDirectory: rootDirectory,
email: params.email, email: params.email,
privateKey: params.privateKey, privateKey: params.privateKey,
client: params.client, client: params.client,
chunkSize: params.chunkSize, chunkSize: params.chunkSize,
gcs: params.gcs,
} }
return &Wrapper{ return &Wrapper{
@ -268,21 +271,16 @@ func (d *driver) Name() string {
// GetContent retrieves the content stored at "path" as a []byte. // GetContent retrieves the content stored at "path" as a []byte.
// This should primarily be used for small objects. // This should primarily be used for small objects.
func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) { func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
name := d.pathToKey(path) r, err := d.bucket.Object(d.pathToKey(path)).NewReader(ctx)
rc, err := d.gcs.Bucket(d.bucket).Object(name).NewReader(ctx) if err != nil {
if err == storage.ErrObjectNotExist { if err == storage.ErrObjectNotExist {
return nil, storagedriver.PathNotFoundError{Path: path} return nil, storagedriver.PathNotFoundError{Path: path}
} }
if err != nil {
return nil, err return nil, err
} }
defer rc.Close() defer r.Close()
p, err := io.ReadAll(rc) return io.ReadAll(r)
if err != nil {
return nil, err
}
return p, nil
} }
// PutContent stores the []byte content at a location designated by "path". // PutContent stores the []byte content at a location designated by "path".
@ -290,30 +288,38 @@ func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error { func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
wc := d.gcs.Bucket(d.bucket).Object(d.pathToKey(path)).NewWriter(ctx)
wc.ContentType = "application/octet-stream" object := d.bucket.Object(d.pathToKey(path))
return putContentsClose(wc, contents) err := d.putContent(ctx, object, contents, blobContentType, nil)
if err != nil {
return err
}
return nil
} }
// Reader retrieves an io.ReadCloser for the content stored at "path" // Reader retrieves an io.ReadCloser for the content stored at "path"
// with a given byte offset. // with a given byte offset.
// May be used to resume reading a stream by providing a nonzero offset. // May be used to resume reading a stream by providing a nonzero offset.
func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
res, err := getObject(d.client, d.bucket, d.pathToKey(path), offset) obj := d.bucket.Object(d.pathToKey(path))
// NOTE(milosgajdos): If length is negative, the object is read until the end
// See: https://pkg.go.dev/cloud.google.com/go/storage#ObjectHandle.NewRangeReader
r, err := obj.NewRangeReader(ctx, offset, -1)
if err != nil { if err != nil {
if res != nil { if err == storage.ErrObjectNotExist {
if res.StatusCode == http.StatusNotFound {
res.Body.Close()
return nil, storagedriver.PathNotFoundError{Path: path} return nil, storagedriver.PathNotFoundError{Path: path}
} }
var status *googleapi.Error
if res.StatusCode == http.StatusRequestedRangeNotSatisfiable { if errors.As(err, &status) {
res.Body.Close() switch status.Code {
obj, err := d.storageStatObject(ctx, path) case http.StatusNotFound:
return nil, storagedriver.PathNotFoundError{Path: path}
case http.StatusRequestedRangeNotSatisfiable:
attrs, err := obj.Attrs(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if offset == obj.Size { if offset == attrs.Size {
return io.NopCloser(bytes.NewReader([]byte{})), nil return io.NopCloser(bytes.NewReader([]byte{})), nil
} }
return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset} return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
@ -321,77 +327,52 @@ func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.Read
} }
return nil, err return nil, err
} }
if res.Header.Get("Content-Type") == uploadSessionContentType { if r.Attrs.ContentType == uploadSessionContentType {
defer res.Body.Close() r.Close()
return nil, storagedriver.PathNotFoundError{Path: path} return nil, storagedriver.PathNotFoundError{Path: path}
} }
return res.Body, nil return r, nil
}
func getObject(client *http.Client, bucket string, name string, offset int64) (*http.Response, error) {
// copied from cloud.google.com/go/storage#NewReader :
// to set the additional "Range" header
u := &url.URL{
Scheme: "https",
Host: "storage.googleapis.com",
Path: fmt.Sprintf("/%s/%s", bucket, name),
}
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
if err != nil {
return nil, err
}
if offset > 0 {
req.Header.Set("Range", fmt.Sprintf("bytes=%v-", offset))
}
var res *http.Response
err = retry(func() error {
var err error
res, err = client.Do(req)
return err
})
if err != nil {
return nil, err
}
return res, googleapi.CheckMediaResponse(res)
} }
// Writer returns a FileWriter which will store the content written to it // Writer returns a FileWriter which will store the content written to it
// at the location designated by "path" after the call to Commit. // at the location designated by "path" after the call to Commit.
func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) { func (d *driver) Writer(ctx context.Context, path string, appendMode bool) (storagedriver.FileWriter, error) {
writer := &writer{ w := &writer{
client: d.client, ctx: ctx,
bucket: d.bucket, driver: d,
name: d.pathToKey(path), object: d.bucket.Object(d.pathToKey(path)),
buffer: make([]byte, d.chunkSize), buffer: make([]byte, d.chunkSize),
gcs: d.gcs,
} }
if append { if appendMode {
err := writer.init(path) err := w.init(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
} }
return writer, nil return w, nil
} }
type writer struct { type writer struct {
client *http.Client ctx context.Context
bucket string object *storage.ObjectHandle
name string driver *driver
size int64 size int64
offset int64 offset int64
closed bool closed bool
cancelled bool
committed bool
sessionURI string sessionURI string
buffer []byte buffer []byte
buffSize int buffSize int
gcs *storage.Client
} }
// Cancel removes any written content from this FileWriter. // Cancel removes any written content from this FileWriter.
func (w *writer) Cancel(ctx context.Context) error { func (w *writer) Cancel(ctx context.Context) error {
w.closed = true w.closed = true
err := storageDeleteObject(ctx, w.bucket, w.name, w.gcs) w.cancelled = true
err := w.object.Delete(ctx)
if err != nil { if err != nil {
if err == storage.ErrObjectNotExist { if err == storage.ErrObjectNotExist {
err = nil err = nil
@ -406,7 +387,7 @@ func (w *writer) Close() error {
} }
w.closed = true w.closed = true
err := w.writeChunk() err := w.writeChunk(w.ctx)
if err != nil { if err != nil {
return err return err
} }
@ -420,37 +401,29 @@ func (w *writer) Close() error {
} }
// commit the writes by updating the upload session // commit the writes by updating the upload session
ctx := context.TODO() metadata := map[string]string{
err = retry(func() error {
wc := w.gcs.Bucket(w.bucket).Object(w.name).NewWriter(ctx)
wc.ContentType = uploadSessionContentType
wc.Metadata = map[string]string{
"Session-URI": w.sessionURI, "Session-URI": w.sessionURI,
"Offset": strconv.FormatInt(w.offset, 10), "Offset": strconv.FormatInt(w.offset, 10),
} }
return putContentsClose(wc, w.buffer[0:w.buffSize]) err = retry(func() error {
}) err := w.driver.putContent(w.ctx, w.object, w.buffer[0:w.buffSize], uploadSessionContentType, metadata)
if err != nil { if err != nil {
return err return err
} }
w.size = w.offset + int64(w.buffSize) w.size = w.offset + int64(w.buffSize)
w.buffSize = 0 w.buffSize = 0
return nil return nil
})
return err
} }
func putContentsClose(wc *storage.Writer, contents []byte) error { func (d *driver) putContent(ctx context.Context, obj *storage.ObjectHandle, content []byte, contentType string, metadata map[string]string) error {
size := len(contents) wc := obj.NewWriter(ctx)
var nn int wc.Metadata = metadata
var err error wc.ContentType = contentType
for nn < size { wc.ChunkSize = d.chunkSize
var n int
n, err = wc.Write(contents[nn:size]) if _, err := bytes.NewReader(content).WriteTo(wc); err != nil {
nn += n
if err != nil {
break
}
}
if err != nil {
return err return err
} }
return wc.Close() return wc.Close()
@ -460,54 +433,48 @@ func putContentsClose(wc *storage.Writer, contents []byte) error {
// available for future calls to StorageDriver.GetContent and // available for future calls to StorageDriver.GetContent and
// StorageDriver.Reader. // StorageDriver.Reader.
func (w *writer) Commit(ctx context.Context) error { func (w *writer) Commit(ctx context.Context) error {
if err := w.checkClosed(); err != nil { if w.closed {
return err return fmt.Errorf("already closed")
} }
w.closed = true w.closed = true
// no session started yet just perform a simple upload // no session started yet just perform a simple upload
if w.sessionURI == "" { if w.sessionURI == "" {
err := retry(func() error { err := retry(func() error {
wc := w.gcs.Bucket(w.bucket).Object(w.name).NewWriter(ctx) err := w.driver.putContent(ctx, w.object, w.buffer[0:w.buffSize], blobContentType, nil)
wc.ContentType = "application/octet-stream"
return putContentsClose(wc, w.buffer[0:w.buffSize])
})
if err != nil { if err != nil {
return err return err
} }
w.committed = true
w.size = w.offset + int64(w.buffSize) w.size = w.offset + int64(w.buffSize)
w.buffSize = 0 w.buffSize = 0
return nil return nil
})
return err
} }
size := w.offset + int64(w.buffSize) size := w.offset + int64(w.buffSize)
var nn int var written int
// loop must be performed at least once to ensure the file is committed even when // loop must be performed at least once to ensure the file is committed even when
// the buffer is empty // the buffer is empty
for { for {
n, err := putChunk(w.client, w.sessionURI, w.buffer[nn:w.buffSize], w.offset, size) n, err := w.putChunk(ctx, w.sessionURI, w.buffer[written:w.buffSize], w.offset, size)
nn += int(n) written += int(n)
w.offset += n w.offset += n
w.size = w.offset w.size = w.offset
if err != nil { if err != nil {
w.buffSize = copy(w.buffer, w.buffer[nn:w.buffSize]) w.buffSize = copy(w.buffer, w.buffer[written:w.buffSize])
return err return err
} }
if nn == w.buffSize { if written == w.buffSize {
break break
} }
} }
w.committed = true
w.buffSize = 0 w.buffSize = 0
return nil return nil
} }
func (w *writer) checkClosed() error { func (w *writer) writeChunk(ctx context.Context) error {
if w.closed {
return fmt.Errorf("Writer already closed")
}
return nil
}
func (w *writer) writeChunk() error {
var err error var err error
// chunks can be uploaded only in multiples of minChunkSize // chunks can be uploaded only in multiples of minChunkSize
// chunkSize is a multiple of minChunkSize less than or equal to buffSize // chunkSize is a multiple of minChunkSize less than or equal to buffSize
@ -517,42 +484,47 @@ func (w *writer) writeChunk() error {
} }
// if their is no sessionURI yet, obtain one by starting the session // if their is no sessionURI yet, obtain one by starting the session
if w.sessionURI == "" { if w.sessionURI == "" {
w.sessionURI, err = startSession(w.client, w.bucket, w.name) w.sessionURI, err = w.newSession()
} }
if err != nil { if err != nil {
return err return err
} }
nn, err := putChunk(w.client, w.sessionURI, w.buffer[0:chunkSize], w.offset, -1) n, err := w.putChunk(ctx, w.sessionURI, w.buffer[0:chunkSize], w.offset, -1)
w.offset += nn w.offset += n
if w.offset > w.size { if w.offset > w.size {
w.size = w.offset w.size = w.offset
} }
// shift the remaining bytes to the start of the buffer // shift the remaining bytes to the start of the buffer
w.buffSize = copy(w.buffer, w.buffer[int(nn):w.buffSize]) w.buffSize = copy(w.buffer, w.buffer[int(n):w.buffSize])
return err return err
} }
func (w *writer) Write(p []byte) (int, error) { func (w *writer) Write(p []byte) (int, error) {
err := w.checkClosed() if w.closed {
if err != nil { return 0, fmt.Errorf("already closed")
return 0, err } else if w.cancelled {
return 0, fmt.Errorf("already cancelled")
} }
var nn int var (
for nn < len(p) { written int
n := copy(w.buffer[w.buffSize:], p[nn:]) err error
)
for written < len(p) {
n := copy(w.buffer[w.buffSize:], p[written:])
w.buffSize += n w.buffSize += n
if w.buffSize == cap(w.buffer) { if w.buffSize == cap(w.buffer) {
err = w.writeChunk() err = w.writeChunk(w.ctx)
if err != nil { if err != nil {
break break
} }
} }
nn += n written += n
} }
w.size = w.offset + int64(w.buffSize) w.size = w.offset + int64(w.buffSize)
return nn, err return written, err
} }
// Size returns the number of bytes written to this FileWriter. // Size returns the number of bytes written to this FileWriter.
@ -560,25 +532,56 @@ func (w *writer) Size() int64 {
return w.size return w.size
} }
func (w *writer) init(path string) error { func (w *writer) init(ctx context.Context) error {
res, err := getObject(w.client, w.bucket, w.name, 0) attrs, err := w.object.Attrs(ctx)
if err != nil { if err != nil {
return err return err
} }
defer res.Body.Close()
if res.Header.Get("Content-Type") != uploadSessionContentType { // NOTE(milosgajdos): when PUSH abruptly finishes by
return storagedriver.PathNotFoundError{Path: path} // calling a single commit and then closes the stream
// attrs.ContentType ends up being set to application/octet-stream
// We must handle this case so the upload can resume.
if attrs.ContentType != uploadSessionContentType &&
attrs.ContentType != blobContentType {
return storagedriver.PathNotFoundError{Path: w.object.ObjectName()}
} }
offset, err := strconv.ParseInt(res.Header.Get("X-Goog-Meta-Offset"), 10, 64)
offset := int64(0)
// NOTE(milosgajdos): if a client creates an empty blob, then
// closes the stream and then attempts to append to it, the offset
// will be empty, in which case strconv.ParseInt will return error
// See: https://pkg.go.dev/strconv#ParseInt
if attrs.Metadata["Offset"] != "" {
offset, err = strconv.ParseInt(attrs.Metadata["Offset"], 10, 64)
if err != nil { if err != nil {
return err return err
} }
buffer, err := io.ReadAll(res.Body) }
r, err := w.object.NewReader(ctx)
if err != nil { if err != nil {
return err return err
} }
w.sessionURI = res.Header.Get("X-Goog-Meta-Session-URI") defer r.Close()
w.buffSize = copy(w.buffer, buffer)
for err == nil && w.buffSize < len(w.buffer) {
var n int
n, err = r.Read(w.buffer[w.buffSize:])
w.buffSize += n
}
if err != nil && err != io.EOF {
return err
}
// NOTE(milosgajdos): if a client closes an existing session and then attempts
// to append to an existing blob, the session will be empty; recreate it
if w.sessionURI = attrs.Metadata["Session-URI"]; w.sessionURI == "" {
w.sessionURI, err = w.newSession()
if err != nil {
return err
}
}
w.offset = offset w.offset = offset
w.size = offset + int64(w.buffSize) w.size = offset + int64(w.buffSize)
return nil return nil
@ -596,7 +599,7 @@ func retry(req request) error {
} }
status, ok := err.(*googleapi.Error) status, ok := err.(*googleapi.Error)
if !ok || (status.Code != 429 && status.Code < http.StatusInternalServerError) { if !ok || (status.Code != http.StatusTooManyRequests && status.Code < http.StatusInternalServerError) {
return err return err
} }
@ -613,7 +616,7 @@ func retry(req request) error {
func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) { func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
var fi storagedriver.FileInfoFields var fi storagedriver.FileInfoFields
// try to get as file // try to get as file
obj, err := d.storageStatObject(ctx, path) obj, err := d.bucket.Object(d.pathToKey(path)).Attrs(ctx)
if err == nil { if err == nil {
if obj.ContentType == uploadSessionContentType { if obj.ContentType == uploadSessionContentType {
return nil, storagedriver.PathNotFoundError{Path: path} return nil, storagedriver.PathNotFoundError{Path: path}
@ -633,18 +636,19 @@ func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo,
Prefix: dirpath, Prefix: dirpath,
} }
objects, err := storageListObjects(ctx, d.bucket, query, d.gcs) obj, err = d.bucket.Objects(ctx, query).Next()
if err != nil { if err != nil {
return nil, err if err == iterator.Done {
}
if len(objects) < 1 {
return nil, storagedriver.PathNotFoundError{Path: path} return nil, storagedriver.PathNotFoundError{Path: path}
} }
return nil, err
}
fi = storagedriver.FileInfoFields{ fi = storagedriver.FileInfoFields{
Path: path, Path: path,
IsDir: true, IsDir: true,
} }
obj = objects[0]
if obj.Name == dirpath { if obj.Name == dirpath {
fi.Size = obj.Size fi.Size = obj.Size
fi.ModTime = obj.Updated fi.ModTime = obj.Updated
@ -659,12 +663,17 @@ func (d *driver) List(ctx context.Context, path string) ([]string, error) {
Delimiter: "/", Delimiter: "/",
Prefix: d.pathToDirKey(path), Prefix: d.pathToDirKey(path),
} }
objects := d.bucket.Objects(ctx, query)
list := make([]string, 0, 64) list := make([]string, 0, 64)
objects, err := storageListObjects(ctx, d.bucket, query, d.gcs) for {
object, err := objects.Next()
if err != nil { if err != nil {
if err == iterator.Done {
break
}
return nil, err return nil, err
} }
for _, object := range objects {
// GCS does not guarantee strong consistency between // GCS does not guarantee strong consistency between
// DELETE and LIST operations. Check that the object is not deleted, // DELETE and LIST operations. Check that the object is not deleted,
// and filter out any objects with a non-zero time-deleted // and filter out any objects with a non-zero time-deleted
@ -689,42 +698,51 @@ func (d *driver) List(ctx context.Context, path string) ([]string, error) {
// Move moves an object stored at sourcePath to destPath, removing the // Move moves an object stored at sourcePath to destPath, removing the
// original object. // original object.
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error { func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
_, err := storageCopyObject(ctx, d.bucket, d.pathToKey(sourcePath), d.bucket, d.pathToKey(destPath), d.gcs) srcKey, dstKey := d.pathToKey(sourcePath), d.pathToKey(destPath)
src := d.bucket.Object(srcKey)
_, err := d.bucket.Object(dstKey).CopierFrom(src).Run(ctx)
if err != nil { if err != nil {
if status, ok := err.(*googleapi.Error); ok { var status *googleapi.Error
if errors.As(err, &status) {
if status.Code == http.StatusNotFound { if status.Code == http.StatusNotFound {
return storagedriver.PathNotFoundError{Path: sourcePath} return storagedriver.PathNotFoundError{Path: srcKey}
} }
} }
return err return fmt.Errorf("move %q to %q: %v", srcKey, dstKey, err)
} }
err = storageDeleteObject(ctx, d.bucket, d.pathToKey(sourcePath), d.gcs) err = src.Delete(ctx)
// if deleting the file fails, log the error, but do not fail; the file was successfully copied, // if deleting the file fails, log the error, but do not fail; the file was successfully copied,
// and the original should eventually be cleaned when purging the uploads folder. // and the original should eventually be cleaned when purging the uploads folder.
if err != nil { if err != nil {
logrus.Infof("error deleting file: %v due to %v", sourcePath, err) logrus.Infof("error deleting %v: %v", sourcePath, err)
} }
return nil return nil
} }
// listAll recursively lists all names of objects stored at "prefix" and its subpaths. // listAll recursively lists all names of objects stored at "prefix" and its subpaths.
func (d *driver) listAll(ctx context.Context, prefix string) ([]string, error) { func (d *driver) listAll(ctx context.Context, prefix string) ([]string, error) {
objects := d.bucket.Objects(ctx, &storage.Query{
Prefix: prefix,
Versions: false,
})
list := make([]string, 0, 64) list := make([]string, 0, 64)
query := &storage.Query{} for {
query.Prefix = prefix object, err := objects.Next()
query.Versions = false
objects, err := storageListObjects(ctx, d.bucket, query, d.gcs)
if err != nil { if err != nil {
if err == iterator.Done {
break
}
return nil, err return nil, err
} }
for _, obj := range objects {
// GCS does not guarantee strong consistency between // GCS does not guarantee strong consistency between
// DELETE and LIST operations. Check that the object is not deleted, // DELETE and LIST operations. Check that the object is not deleted,
// and filter out any objects with a non-zero time-deleted // and filter out any objects with a non-zero time-deleted
if obj.Deleted.IsZero() { if object.Deleted.IsZero() {
list = append(list, obj.Name) list = append(list, object.Name)
} }
} }
return list, nil return list, nil
} }
@ -736,9 +754,14 @@ func (d *driver) Delete(ctx context.Context, path string) error {
return err return err
} }
if len(keys) > 0 { if len(keys) > 0 {
sort.Sort(sort.Reverse(sort.StringSlice(keys))) // NOTE(milosgajdos): d.listAll calls (BucketHandle).Objects
for _, key := range keys { // See: https://pkg.go.dev/cloud.google.com/go/storage#BucketHandle.Objects
err := storageDeleteObject(ctx, d.bucket, key, d.gcs) // docs: Objects will be iterated over lexicographically by name.
// This means we don't have to reverse order the slice; we can
// range over the keys slice in reverse order
for i := len(keys) - 1; i >= 0; i-- {
key := keys[i]
err := d.bucket.Object(key).Delete(ctx)
// GCS only guarantees eventual consistency, so listAll might return // GCS only guarantees eventual consistency, so listAll might return
// paths that no longer exist. If this happens, just ignore any not // paths that no longer exist. If this happens, just ignore any not
// found error // found error
@ -753,62 +776,13 @@ func (d *driver) Delete(ctx context.Context, path string) error {
} }
return nil return nil
} }
err = storageDeleteObject(ctx, d.bucket, d.pathToKey(path), d.gcs) err = d.bucket.Object(d.pathToKey(path)).Delete(ctx)
if err == storage.ErrObjectNotExist { if err == storage.ErrObjectNotExist {
return storagedriver.PathNotFoundError{Path: path} return storagedriver.PathNotFoundError{Path: path}
} }
return err return err
} }
func storageDeleteObject(ctx context.Context, bucket string, name string, gcs *storage.Client) error {
return gcs.Bucket(bucket).Object(name).Delete(ctx)
}
func (d *driver) storageStatObject(ctx context.Context, name string) (*storage.ObjectAttrs, error) {
bkt := d.gcs.Bucket(d.bucket)
var obj *storage.ObjectAttrs
err := retry(func() error {
var err error
obj, err = bkt.Object(d.pathToKey(name)).Attrs(ctx)
return err
})
return obj, err
}
func storageListObjects(ctx context.Context, bucket string, q *storage.Query, gcs *storage.Client) ([]*storage.ObjectAttrs, error) {
bkt := gcs.Bucket(bucket)
var objs []*storage.ObjectAttrs
it := bkt.Objects(ctx, q)
for {
objAttrs, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, err
}
objs = append(objs, objAttrs)
}
return objs, nil
}
func storageCopyObject(ctx context.Context, srcBucket, srcName string, destBucket, destName string, gcs *storage.Client) (*storage.ObjectAttrs, error) {
src := gcs.Bucket(srcBucket).Object(srcName)
dst := gcs.Bucket(destBucket).Object(destName)
attrs, err := dst.CopierFrom(src).Run(ctx)
if err != nil {
var status *googleapi.Error
if errors.As(err, &status) {
if status.Code == http.StatusNotFound {
return nil, storagedriver.PathNotFoundError{Path: srcName}
}
}
return nil, fmt.Errorf("Object(%q).CopierFrom(%q).Run: %w", destName, srcName, err)
}
return attrs, err
}
// RedirectURL returns a URL which may be used to retrieve the content stored at // RedirectURL returns a URL which may be used to retrieve the content stored at
// the given path, possibly using the given options. // the given path, possibly using the given options.
func (d *driver) RedirectURL(r *http.Request, path string) (string, error) { func (d *driver) RedirectURL(r *http.Request, path string) (string, error) {
@ -826,7 +800,7 @@ func (d *driver) RedirectURL(r *http.Request, path string) (string, error) {
Method: r.Method, Method: r.Method,
Expires: time.Now().Add(20 * time.Minute), Expires: time.Now().Add(20 * time.Minute),
} }
return storage.SignedURL(d.bucket, d.pathToKey(path), opts) return d.bucket.SignedURL(d.pathToKey(path), opts)
} }
// Walk traverses a filesystem defined within driver, starting // Walk traverses a filesystem defined within driver, starting
@ -835,21 +809,22 @@ func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn,
return storagedriver.WalkFallback(ctx, d, path, f, options...) return storagedriver.WalkFallback(ctx, d, path, f, options...)
} }
func startSession(client *http.Client, bucket string, name string) (uri string, err error) { func (w *writer) newSession() (uri string, err error) {
u := &url.URL{ u := &url.URL{
Scheme: "https", Scheme: "https",
Host: "www.googleapis.com", Host: "www.googleapis.com",
Path: fmt.Sprintf("/upload/storage/v1/b/%v/o", bucket), Path: fmt.Sprintf("/upload/storage/v1/b/%v/o", w.object.BucketName()),
RawQuery: fmt.Sprintf("uploadType=resumable&name=%v", name), RawQuery: fmt.Sprintf("uploadType=resumable&name=%v", w.object.ObjectName()),
} }
err = retry(func() error { req, err := http.NewRequestWithContext(w.ctx, http.MethodPost, u.String(), nil)
req, err := http.NewRequest(http.MethodPost, u.String(), nil)
if err != nil { if err != nil {
return err return "", err
} }
req.Header.Set("X-Upload-Content-Type", "application/octet-stream") req.Header.Set("X-Upload-Content-Type", blobContentType)
req.Header.Set("Content-Length", "0") req.Header.Set("Content-Length", "0")
resp, err := client.Do(req)
err = retry(func() error {
resp, err := w.driver.client.Do(req)
if err != nil { if err != nil {
return err return err
} }
@ -864,12 +839,10 @@ func startSession(client *http.Client, bucket string, name string) (uri string,
return uri, err return uri, err
} }
func putChunk(client *http.Client, sessionURI string, chunk []byte, from int64, totalSize int64) (int64, error) { func (w *writer) putChunk(ctx context.Context, sessionURI string, chunk []byte, from int64, totalSize int64) (int64, error) {
bytesPut := int64(0) req, err := http.NewRequestWithContext(ctx, http.MethodPut, sessionURI, bytes.NewReader(chunk))
err := retry(func() error {
req, err := http.NewRequest(http.MethodPut, sessionURI, bytes.NewReader(chunk))
if err != nil { if err != nil {
return err return 0, err
} }
length := int64(len(chunk)) length := int64(len(chunk))
to := from + length - 1 to := from + length - 1
@ -877,20 +850,22 @@ func putChunk(client *http.Client, sessionURI string, chunk []byte, from int64,
if totalSize >= 0 { if totalSize >= 0 {
size = strconv.FormatInt(totalSize, 10) size = strconv.FormatInt(totalSize, 10)
} }
req.Header.Set("Content-Type", "application/octet-stream") req.Header.Set("Content-Type", blobContentType)
if from == to+1 { if from == to+1 {
req.Header.Set("Content-Range", fmt.Sprintf("bytes */%v", size)) req.Header.Set("Content-Range", fmt.Sprintf("bytes */%s", size))
} else { } else {
req.Header.Set("Content-Range", fmt.Sprintf("bytes %v-%v/%v", from, to, size)) req.Header.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%s", from, to, size))
} }
req.Header.Set("Content-Length", strconv.FormatInt(length, 10)) req.Header.Set("Content-Length", strconv.FormatInt(length, 10))
resp, err := client.Do(req) bytesPut := int64(0)
err = retry(func() error {
resp, err := w.driver.client.Do(req)
if err != nil { if err != nil {
return err return err
} }
defer resp.Body.Close() defer resp.Body.Close()
if totalSize < 0 && resp.StatusCode == 308 { if totalSize < 0 && resp.StatusCode == http.StatusPermanentRedirect {
groups := rangeHeader.FindStringSubmatch(resp.Header.Get("Range")) groups := rangeHeader.FindStringSubmatch(resp.Header.Get("Range"))
end, err := strconv.ParseInt(groups[2], 10, 64) end, err := strconv.ParseInt(groups[2], 10, 64)
if err != nil { if err != nil {