diff --git a/registry/storage/driver/gcs/gcs.go b/registry/storage/driver/gcs/gcs.go index 4cbe2fc3..cab97165 100644 --- a/registry/storage/driver/gcs/gcs.go +++ b/registry/storage/driver/gcs/gcs.go @@ -27,7 +27,6 @@ import ( "os" "reflect" "regexp" - "sort" "strconv" "strings" "time" @@ -49,11 +48,13 @@ const ( driverName = "gcs" dummyProjectID = "" + minChunkSize = 256 * 1024 + defaultChunkSize = 16 * 1024 * 1024 + defaultMaxConcurrency = 50 + minConcurrency = 25 + uploadSessionContentType = "application/x-docker-upload-session" - minChunkSize = 256 * 1024 - defaultChunkSize = 20 * minChunkSize - defaultMaxConcurrency = 50 - minConcurrency = 25 + blobContentType = "application/octet-stream" maxTries = 5 ) @@ -97,12 +98,11 @@ var _ storagedriver.StorageDriver = &driver{} // Objects are stored at absolute keys in the provided bucket. type driver struct { client *http.Client - bucket string + bucket *storage.BucketHandle email string privateKey []byte rootDirectory string chunkSize int - gcs *storage.Client } // Wrapper wraps `driver` with a throttler, ensuring that no more than N @@ -136,7 +136,7 @@ func FromParameters(ctx context.Context, parameters map[string]interface{}) (sto case string: vv, err := strconv.Atoi(v) if err != nil { - return nil, fmt.Errorf("chunksize parameter must be an integer, %v invalid", chunkSizeParam) + return nil, fmt.Errorf("chunksize must be an integer, %v invalid", chunkSizeParam) } chunkSize = vv case int, uint, int32, uint32, uint64, int64: @@ -146,7 +146,7 @@ func FromParameters(ctx context.Context, parameters map[string]interface{}) (sto } if chunkSize < minChunkSize { - return nil, fmt.Errorf("The chunksize %#v parameter should be a number that is larger than or equal to %d", chunkSize, minChunkSize) + return nil, fmt.Errorf("chunksize %#v must be larger than or equal to %d", chunkSize, minChunkSize) } if chunkSize%minChunkSize != 0 { @@ -203,10 +203,17 @@ func FromParameters(ctx context.Context, parameters map[string]interface{}) (sto } } else { var err error + // DefaultTokenSource is a convenience method. It first calls FindDefaultCredentials, + // then uses the credentials to construct an http.Client or an oauth2.TokenSource. + // https://pkg.go.dev/golang.org/x/oauth2/google#hdr-Credentials ts, err = google.DefaultTokenSource(ctx, storage.ScopeFullControl) if err != nil { return nil, err } + gcs, err = storage.NewClient(ctx) + if err != nil { + return nil, err + } } maxConcurrency, err := base.GetLimitFromParameter(parameters["maxconcurrency"], minConcurrency, defaultMaxConcurrency) @@ -214,9 +221,6 @@ func FromParameters(ctx context.Context, parameters map[string]interface{}) (sto return nil, fmt.Errorf("maxconcurrency config error: %s", err) } - if gcs == nil { - panic("gcs client was nil") - } params := driverParameters{ bucket: fmt.Sprint(bucket), rootDirectory: fmt.Sprint(rootDirectory), @@ -241,13 +245,12 @@ func New(ctx context.Context, params driverParameters) (storagedriver.StorageDri return nil, fmt.Errorf("Invalid chunksize: %d is not a positive multiple of %d", params.chunkSize, minChunkSize) } d := &driver{ - bucket: params.bucket, + bucket: params.gcs.Bucket(params.bucket), rootDirectory: rootDirectory, email: params.email, privateKey: params.privateKey, client: params.client, chunkSize: params.chunkSize, - gcs: params.gcs, } return &Wrapper{ @@ -268,21 +271,16 @@ func (d *driver) Name() string { // GetContent retrieves the content stored at "path" as a []byte. // This should primarily be used for small objects. func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) { - name := d.pathToKey(path) - rc, err := d.gcs.Bucket(d.bucket).Object(name).NewReader(ctx) - if err == storage.ErrObjectNotExist { - return nil, storagedriver.PathNotFoundError{Path: path} - } + r, err := d.bucket.Object(d.pathToKey(path)).NewReader(ctx) if err != nil { + if err == storage.ErrObjectNotExist { + return nil, storagedriver.PathNotFoundError{Path: path} + } return nil, err } - defer rc.Close() + defer r.Close() - p, err := io.ReadAll(rc) - if err != nil { - return nil, err - } - return p, nil + return io.ReadAll(r) } // PutContent stores the []byte content at a location designated by "path". @@ -290,30 +288,38 @@ func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) { func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error { ctx, cancel := context.WithCancel(ctx) defer cancel() - wc := d.gcs.Bucket(d.bucket).Object(d.pathToKey(path)).NewWriter(ctx) - wc.ContentType = "application/octet-stream" - return putContentsClose(wc, contents) + + object := d.bucket.Object(d.pathToKey(path)) + err := d.putContent(ctx, object, contents, blobContentType, nil) + if err != nil { + return err + } + return nil } // Reader retrieves an io.ReadCloser for the content stored at "path" // with a given byte offset. // May be used to resume reading a stream by providing a nonzero offset. func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { - res, err := getObject(d.client, d.bucket, d.pathToKey(path), offset) + obj := d.bucket.Object(d.pathToKey(path)) + // NOTE(milosgajdos): If length is negative, the object is read until the end + // See: https://pkg.go.dev/cloud.google.com/go/storage#ObjectHandle.NewRangeReader + r, err := obj.NewRangeReader(ctx, offset, -1) if err != nil { - if res != nil { - if res.StatusCode == http.StatusNotFound { - res.Body.Close() + if err == storage.ErrObjectNotExist { + return nil, storagedriver.PathNotFoundError{Path: path} + } + var status *googleapi.Error + if errors.As(err, &status) { + switch status.Code { + case http.StatusNotFound: return nil, storagedriver.PathNotFoundError{Path: path} - } - - if res.StatusCode == http.StatusRequestedRangeNotSatisfiable { - res.Body.Close() - obj, err := d.storageStatObject(ctx, path) + case http.StatusRequestedRangeNotSatisfiable: + attrs, err := obj.Attrs(ctx) if err != nil { return nil, err } - if offset == obj.Size { + if offset == attrs.Size { return io.NopCloser(bytes.NewReader([]byte{})), nil } return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset} @@ -321,77 +327,52 @@ func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.Read } return nil, err } - if res.Header.Get("Content-Type") == uploadSessionContentType { - defer res.Body.Close() + if r.Attrs.ContentType == uploadSessionContentType { + r.Close() return nil, storagedriver.PathNotFoundError{Path: path} } - return res.Body, nil -} - -func getObject(client *http.Client, bucket string, name string, offset int64) (*http.Response, error) { - // copied from cloud.google.com/go/storage#NewReader : - // to set the additional "Range" header - u := &url.URL{ - Scheme: "https", - Host: "storage.googleapis.com", - Path: fmt.Sprintf("/%s/%s", bucket, name), - } - req, err := http.NewRequest(http.MethodGet, u.String(), nil) - if err != nil { - return nil, err - } - if offset > 0 { - req.Header.Set("Range", fmt.Sprintf("bytes=%v-", offset)) - } - var res *http.Response - err = retry(func() error { - var err error - res, err = client.Do(req) - return err - }) - if err != nil { - return nil, err - } - return res, googleapi.CheckMediaResponse(res) + return r, nil } // Writer returns a FileWriter which will store the content written to it // at the location designated by "path" after the call to Commit. -func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) { - writer := &writer{ - client: d.client, - bucket: d.bucket, - name: d.pathToKey(path), +func (d *driver) Writer(ctx context.Context, path string, appendMode bool) (storagedriver.FileWriter, error) { + w := &writer{ + ctx: ctx, + driver: d, + object: d.bucket.Object(d.pathToKey(path)), buffer: make([]byte, d.chunkSize), - gcs: d.gcs, } - if append { - err := writer.init(path) + if appendMode { + err := w.init(ctx) if err != nil { return nil, err } } - return writer, nil + return w, nil } type writer struct { - client *http.Client - bucket string - name string + ctx context.Context + object *storage.ObjectHandle + driver *driver size int64 offset int64 closed bool + cancelled bool + committed bool sessionURI string buffer []byte buffSize int - gcs *storage.Client } // Cancel removes any written content from this FileWriter. func (w *writer) Cancel(ctx context.Context) error { w.closed = true - err := storageDeleteObject(ctx, w.bucket, w.name, w.gcs) + w.cancelled = true + + err := w.object.Delete(ctx) if err != nil { if err == storage.ErrObjectNotExist { err = nil @@ -406,7 +387,7 @@ func (w *writer) Close() error { } w.closed = true - err := w.writeChunk() + err := w.writeChunk(w.ctx) if err != nil { return err } @@ -420,37 +401,29 @@ func (w *writer) Close() error { } // commit the writes by updating the upload session - ctx := context.TODO() - err = retry(func() error { - wc := w.gcs.Bucket(w.bucket).Object(w.name).NewWriter(ctx) - wc.ContentType = uploadSessionContentType - wc.Metadata = map[string]string{ - "Session-URI": w.sessionURI, - "Offset": strconv.FormatInt(w.offset, 10), - } - return putContentsClose(wc, w.buffer[0:w.buffSize]) - }) - if err != nil { - return err + metadata := map[string]string{ + "Session-URI": w.sessionURI, + "Offset": strconv.FormatInt(w.offset, 10), } - w.size = w.offset + int64(w.buffSize) - w.buffSize = 0 - return nil + err = retry(func() error { + err := w.driver.putContent(w.ctx, w.object, w.buffer[0:w.buffSize], uploadSessionContentType, metadata) + if err != nil { + return err + } + w.size = w.offset + int64(w.buffSize) + w.buffSize = 0 + return nil + }) + return err } -func putContentsClose(wc *storage.Writer, contents []byte) error { - size := len(contents) - var nn int - var err error - for nn < size { - var n int - n, err = wc.Write(contents[nn:size]) - nn += n - if err != nil { - break - } - } - if err != nil { +func (d *driver) putContent(ctx context.Context, obj *storage.ObjectHandle, content []byte, contentType string, metadata map[string]string) error { + wc := obj.NewWriter(ctx) + wc.Metadata = metadata + wc.ContentType = contentType + wc.ChunkSize = d.chunkSize + + if _, err := bytes.NewReader(content).WriteTo(wc); err != nil { return err } return wc.Close() @@ -460,54 +433,48 @@ func putContentsClose(wc *storage.Writer, contents []byte) error { // available for future calls to StorageDriver.GetContent and // StorageDriver.Reader. func (w *writer) Commit(ctx context.Context) error { - if err := w.checkClosed(); err != nil { - return err + if w.closed { + return fmt.Errorf("already closed") } w.closed = true // no session started yet just perform a simple upload if w.sessionURI == "" { err := retry(func() error { - wc := w.gcs.Bucket(w.bucket).Object(w.name).NewWriter(ctx) - wc.ContentType = "application/octet-stream" - return putContentsClose(wc, w.buffer[0:w.buffSize]) + err := w.driver.putContent(ctx, w.object, w.buffer[0:w.buffSize], blobContentType, nil) + if err != nil { + return err + } + w.committed = true + w.size = w.offset + int64(w.buffSize) + w.buffSize = 0 + return nil }) - if err != nil { - return err - } - w.size = w.offset + int64(w.buffSize) - w.buffSize = 0 - return nil + return err } size := w.offset + int64(w.buffSize) - var nn int + var written int // loop must be performed at least once to ensure the file is committed even when // the buffer is empty for { - n, err := putChunk(w.client, w.sessionURI, w.buffer[nn:w.buffSize], w.offset, size) - nn += int(n) + n, err := w.putChunk(ctx, w.sessionURI, w.buffer[written:w.buffSize], w.offset, size) + written += int(n) w.offset += n w.size = w.offset if err != nil { - w.buffSize = copy(w.buffer, w.buffer[nn:w.buffSize]) + w.buffSize = copy(w.buffer, w.buffer[written:w.buffSize]) return err } - if nn == w.buffSize { + if written == w.buffSize { break } } + w.committed = true w.buffSize = 0 return nil } -func (w *writer) checkClosed() error { - if w.closed { - return fmt.Errorf("Writer already closed") - } - return nil -} - -func (w *writer) writeChunk() error { +func (w *writer) writeChunk(ctx context.Context) error { var err error // chunks can be uploaded only in multiples of minChunkSize // chunkSize is a multiple of minChunkSize less than or equal to buffSize @@ -517,42 +484,47 @@ func (w *writer) writeChunk() error { } // if their is no sessionURI yet, obtain one by starting the session if w.sessionURI == "" { - w.sessionURI, err = startSession(w.client, w.bucket, w.name) + w.sessionURI, err = w.newSession() } if err != nil { return err } - nn, err := putChunk(w.client, w.sessionURI, w.buffer[0:chunkSize], w.offset, -1) - w.offset += nn + n, err := w.putChunk(ctx, w.sessionURI, w.buffer[0:chunkSize], w.offset, -1) + w.offset += n if w.offset > w.size { w.size = w.offset } // shift the remaining bytes to the start of the buffer - w.buffSize = copy(w.buffer, w.buffer[int(nn):w.buffSize]) + w.buffSize = copy(w.buffer, w.buffer[int(n):w.buffSize]) return err } func (w *writer) Write(p []byte) (int, error) { - err := w.checkClosed() - if err != nil { - return 0, err + if w.closed { + return 0, fmt.Errorf("already closed") + } else if w.cancelled { + return 0, fmt.Errorf("already cancelled") } - var nn int - for nn < len(p) { - n := copy(w.buffer[w.buffSize:], p[nn:]) + var ( + written int + err error + ) + + for written < len(p) { + n := copy(w.buffer[w.buffSize:], p[written:]) w.buffSize += n if w.buffSize == cap(w.buffer) { - err = w.writeChunk() + err = w.writeChunk(w.ctx) if err != nil { break } } - nn += n + written += n } w.size = w.offset + int64(w.buffSize) - return nn, err + return written, err } // Size returns the number of bytes written to this FileWriter. @@ -560,25 +532,56 @@ func (w *writer) Size() int64 { return w.size } -func (w *writer) init(path string) error { - res, err := getObject(w.client, w.bucket, w.name, 0) +func (w *writer) init(ctx context.Context) error { + attrs, err := w.object.Attrs(ctx) if err != nil { return err } - defer res.Body.Close() - if res.Header.Get("Content-Type") != uploadSessionContentType { - return storagedriver.PathNotFoundError{Path: path} + + // NOTE(milosgajdos): when PUSH abruptly finishes by + // calling a single commit and then closes the stream + // attrs.ContentType ends up being set to application/octet-stream + // We must handle this case so the upload can resume. + if attrs.ContentType != uploadSessionContentType && + attrs.ContentType != blobContentType { + return storagedriver.PathNotFoundError{Path: w.object.ObjectName()} } - offset, err := strconv.ParseInt(res.Header.Get("X-Goog-Meta-Offset"), 10, 64) + + offset := int64(0) + // NOTE(milosgajdos): if a client creates an empty blob, then + // closes the stream and then attempts to append to it, the offset + // will be empty, in which case strconv.ParseInt will return error + // See: https://pkg.go.dev/strconv#ParseInt + if attrs.Metadata["Offset"] != "" { + offset, err = strconv.ParseInt(attrs.Metadata["Offset"], 10, 64) + if err != nil { + return err + } + } + + r, err := w.object.NewReader(ctx) if err != nil { return err } - buffer, err := io.ReadAll(res.Body) - if err != nil { + defer r.Close() + + for err == nil && w.buffSize < len(w.buffer) { + var n int + n, err = r.Read(w.buffer[w.buffSize:]) + w.buffSize += n + } + if err != nil && err != io.EOF { return err } - w.sessionURI = res.Header.Get("X-Goog-Meta-Session-URI") - w.buffSize = copy(w.buffer, buffer) + + // NOTE(milosgajdos): if a client closes an existing session and then attempts + // to append to an existing blob, the session will be empty; recreate it + if w.sessionURI = attrs.Metadata["Session-URI"]; w.sessionURI == "" { + w.sessionURI, err = w.newSession() + if err != nil { + return err + } + } w.offset = offset w.size = offset + int64(w.buffSize) return nil @@ -596,7 +599,7 @@ func retry(req request) error { } status, ok := err.(*googleapi.Error) - if !ok || (status.Code != 429 && status.Code < http.StatusInternalServerError) { + if !ok || (status.Code != http.StatusTooManyRequests && status.Code < http.StatusInternalServerError) { return err } @@ -613,7 +616,7 @@ func retry(req request) error { func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) { var fi storagedriver.FileInfoFields // try to get as file - obj, err := d.storageStatObject(ctx, path) + obj, err := d.bucket.Object(d.pathToKey(path)).Attrs(ctx) if err == nil { if obj.ContentType == uploadSessionContentType { return nil, storagedriver.PathNotFoundError{Path: path} @@ -633,18 +636,19 @@ func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, Prefix: dirpath, } - objects, err := storageListObjects(ctx, d.bucket, query, d.gcs) + obj, err = d.bucket.Objects(ctx, query).Next() if err != nil { + if err == iterator.Done { + return nil, storagedriver.PathNotFoundError{Path: path} + } return nil, err } - if len(objects) < 1 { - return nil, storagedriver.PathNotFoundError{Path: path} - } + fi = storagedriver.FileInfoFields{ Path: path, IsDir: true, } - obj = objects[0] + if obj.Name == dirpath { fi.Size = obj.Size fi.ModTime = obj.Updated @@ -659,12 +663,17 @@ func (d *driver) List(ctx context.Context, path string) ([]string, error) { Delimiter: "/", Prefix: d.pathToDirKey(path), } + objects := d.bucket.Objects(ctx, query) + list := make([]string, 0, 64) - objects, err := storageListObjects(ctx, d.bucket, query, d.gcs) - if err != nil { - return nil, err - } - for _, object := range objects { + for { + object, err := objects.Next() + if err != nil { + if err == iterator.Done { + break + } + return nil, err + } // GCS does not guarantee strong consistency between // DELETE and LIST operations. Check that the object is not deleted, // and filter out any objects with a non-zero time-deleted @@ -689,42 +698,51 @@ func (d *driver) List(ctx context.Context, path string) ([]string, error) { // Move moves an object stored at sourcePath to destPath, removing the // original object. func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error { - _, err := storageCopyObject(ctx, d.bucket, d.pathToKey(sourcePath), d.bucket, d.pathToKey(destPath), d.gcs) + srcKey, dstKey := d.pathToKey(sourcePath), d.pathToKey(destPath) + src := d.bucket.Object(srcKey) + _, err := d.bucket.Object(dstKey).CopierFrom(src).Run(ctx) if err != nil { - if status, ok := err.(*googleapi.Error); ok { + var status *googleapi.Error + if errors.As(err, &status) { if status.Code == http.StatusNotFound { - return storagedriver.PathNotFoundError{Path: sourcePath} + return storagedriver.PathNotFoundError{Path: srcKey} } } - return err + return fmt.Errorf("move %q to %q: %v", srcKey, dstKey, err) } - err = storageDeleteObject(ctx, d.bucket, d.pathToKey(sourcePath), d.gcs) + err = src.Delete(ctx) // if deleting the file fails, log the error, but do not fail; the file was successfully copied, // and the original should eventually be cleaned when purging the uploads folder. if err != nil { - logrus.Infof("error deleting file: %v due to %v", sourcePath, err) + logrus.Infof("error deleting %v: %v", sourcePath, err) } return nil } // listAll recursively lists all names of objects stored at "prefix" and its subpaths. func (d *driver) listAll(ctx context.Context, prefix string) ([]string, error) { + objects := d.bucket.Objects(ctx, &storage.Query{ + Prefix: prefix, + Versions: false, + }) + list := make([]string, 0, 64) - query := &storage.Query{} - query.Prefix = prefix - query.Versions = false - objects, err := storageListObjects(ctx, d.bucket, query, d.gcs) - if err != nil { - return nil, err - } - for _, obj := range objects { + for { + object, err := objects.Next() + if err != nil { + if err == iterator.Done { + break + } + return nil, err + } // GCS does not guarantee strong consistency between // DELETE and LIST operations. Check that the object is not deleted, // and filter out any objects with a non-zero time-deleted - if obj.Deleted.IsZero() { - list = append(list, obj.Name) + if object.Deleted.IsZero() { + list = append(list, object.Name) } } + return list, nil } @@ -736,9 +754,14 @@ func (d *driver) Delete(ctx context.Context, path string) error { return err } if len(keys) > 0 { - sort.Sort(sort.Reverse(sort.StringSlice(keys))) - for _, key := range keys { - err := storageDeleteObject(ctx, d.bucket, key, d.gcs) + // NOTE(milosgajdos): d.listAll calls (BucketHandle).Objects + // See: https://pkg.go.dev/cloud.google.com/go/storage#BucketHandle.Objects + // docs: Objects will be iterated over lexicographically by name. + // This means we don't have to reverse order the slice; we can + // range over the keys slice in reverse order + for i := len(keys) - 1; i >= 0; i-- { + key := keys[i] + err := d.bucket.Object(key).Delete(ctx) // GCS only guarantees eventual consistency, so listAll might return // paths that no longer exist. If this happens, just ignore any not // found error @@ -753,62 +776,13 @@ func (d *driver) Delete(ctx context.Context, path string) error { } return nil } - err = storageDeleteObject(ctx, d.bucket, d.pathToKey(path), d.gcs) + err = d.bucket.Object(d.pathToKey(path)).Delete(ctx) if err == storage.ErrObjectNotExist { return storagedriver.PathNotFoundError{Path: path} } return err } -func storageDeleteObject(ctx context.Context, bucket string, name string, gcs *storage.Client) error { - return gcs.Bucket(bucket).Object(name).Delete(ctx) -} - -func (d *driver) storageStatObject(ctx context.Context, name string) (*storage.ObjectAttrs, error) { - bkt := d.gcs.Bucket(d.bucket) - var obj *storage.ObjectAttrs - err := retry(func() error { - var err error - obj, err = bkt.Object(d.pathToKey(name)).Attrs(ctx) - return err - }) - return obj, err -} - -func storageListObjects(ctx context.Context, bucket string, q *storage.Query, gcs *storage.Client) ([]*storage.ObjectAttrs, error) { - bkt := gcs.Bucket(bucket) - var objs []*storage.ObjectAttrs - it := bkt.Objects(ctx, q) - for { - objAttrs, err := it.Next() - if err == iterator.Done { - break - } - if err != nil { - return nil, err - } - objs = append(objs, objAttrs) - } - - return objs, nil -} - -func storageCopyObject(ctx context.Context, srcBucket, srcName string, destBucket, destName string, gcs *storage.Client) (*storage.ObjectAttrs, error) { - src := gcs.Bucket(srcBucket).Object(srcName) - dst := gcs.Bucket(destBucket).Object(destName) - attrs, err := dst.CopierFrom(src).Run(ctx) - if err != nil { - var status *googleapi.Error - if errors.As(err, &status) { - if status.Code == http.StatusNotFound { - return nil, storagedriver.PathNotFoundError{Path: srcName} - } - } - return nil, fmt.Errorf("Object(%q).CopierFrom(%q).Run: %w", destName, srcName, err) - } - return attrs, err -} - // RedirectURL returns a URL which may be used to retrieve the content stored at // the given path, possibly using the given options. func (d *driver) RedirectURL(r *http.Request, path string) (string, error) { @@ -826,7 +800,7 @@ func (d *driver) RedirectURL(r *http.Request, path string) (string, error) { Method: r.Method, Expires: time.Now().Add(20 * time.Minute), } - return storage.SignedURL(d.bucket, d.pathToKey(path), opts) + return d.bucket.SignedURL(d.pathToKey(path), opts) } // Walk traverses a filesystem defined within driver, starting @@ -835,21 +809,22 @@ func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn, return storagedriver.WalkFallback(ctx, d, path, f, options...) } -func startSession(client *http.Client, bucket string, name string) (uri string, err error) { +func (w *writer) newSession() (uri string, err error) { u := &url.URL{ Scheme: "https", Host: "www.googleapis.com", - Path: fmt.Sprintf("/upload/storage/v1/b/%v/o", bucket), - RawQuery: fmt.Sprintf("uploadType=resumable&name=%v", name), + Path: fmt.Sprintf("/upload/storage/v1/b/%v/o", w.object.BucketName()), + RawQuery: fmt.Sprintf("uploadType=resumable&name=%v", w.object.ObjectName()), } + req, err := http.NewRequestWithContext(w.ctx, http.MethodPost, u.String(), nil) + if err != nil { + return "", err + } + req.Header.Set("X-Upload-Content-Type", blobContentType) + req.Header.Set("Content-Length", "0") + err = retry(func() error { - req, err := http.NewRequest(http.MethodPost, u.String(), nil) - if err != nil { - return err - } - req.Header.Set("X-Upload-Content-Type", "application/octet-stream") - req.Header.Set("Content-Length", "0") - resp, err := client.Do(req) + resp, err := w.driver.client.Do(req) if err != nil { return err } @@ -864,33 +839,33 @@ func startSession(client *http.Client, bucket string, name string) (uri string, return uri, err } -func putChunk(client *http.Client, sessionURI string, chunk []byte, from int64, totalSize int64) (int64, error) { - bytesPut := int64(0) - err := retry(func() error { - req, err := http.NewRequest(http.MethodPut, sessionURI, bytes.NewReader(chunk)) - if err != nil { - return err - } - length := int64(len(chunk)) - to := from + length - 1 - size := "*" - if totalSize >= 0 { - size = strconv.FormatInt(totalSize, 10) - } - req.Header.Set("Content-Type", "application/octet-stream") - if from == to+1 { - req.Header.Set("Content-Range", fmt.Sprintf("bytes */%v", size)) - } else { - req.Header.Set("Content-Range", fmt.Sprintf("bytes %v-%v/%v", from, to, size)) - } - req.Header.Set("Content-Length", strconv.FormatInt(length, 10)) +func (w *writer) putChunk(ctx context.Context, sessionURI string, chunk []byte, from int64, totalSize int64) (int64, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodPut, sessionURI, bytes.NewReader(chunk)) + if err != nil { + return 0, err + } + length := int64(len(chunk)) + to := from + length - 1 + size := "*" + if totalSize >= 0 { + size = strconv.FormatInt(totalSize, 10) + } + req.Header.Set("Content-Type", blobContentType) + if from == to+1 { + req.Header.Set("Content-Range", fmt.Sprintf("bytes */%s", size)) + } else { + req.Header.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%s", from, to, size)) + } + req.Header.Set("Content-Length", strconv.FormatInt(length, 10)) - resp, err := client.Do(req) + bytesPut := int64(0) + err = retry(func() error { + resp, err := w.driver.client.Do(req) if err != nil { return err } defer resp.Body.Close() - if totalSize < 0 && resp.StatusCode == 308 { + if totalSize < 0 && resp.StatusCode == http.StatusPermanentRedirect { groups := rangeHeader.FindStringSubmatch(resp.Header.Get("Range")) end, err := strconv.ParseInt(groups[2], 10, 64) if err != nil {