// Package gcs provides a storagedriver.StorageDriver implementation to
// store blobs in Google cloud storage.
//
// This package leverages the google.golang.org/cloud/storage client library
// for interfacing with gcs.
//
// Because gcs is a key, value store the Stat call does not support last modification
// time for directories (directories are an abstraction for key, value stores)
//
// Note that the contents of incomplete uploads are not accessible even though
// Stat returns their length
package gcs

import (
	"bytes"
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"math/rand"
	"net/http"
	"net/url"
	"os"
	"reflect"
	"regexp"
	"strconv"
	"strings"
	"time"

	"cloud.google.com/go/storage"
	storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
	"github.com/distribution/distribution/v3/registry/storage/driver/base"
	"github.com/distribution/distribution/v3/registry/storage/driver/factory"
	"github.com/sirupsen/logrus"
	"golang.org/x/oauth2"
	"golang.org/x/oauth2/google"
	"golang.org/x/oauth2/jwt"
	"google.golang.org/api/googleapi"
	"google.golang.org/api/iterator"
	"google.golang.org/api/option"
)

const (
	driverName     = "gcs"
	dummyProjectID = "<unknown>"

	minChunkSize          = 256 * 1024
	defaultChunkSize      = 16 * 1024 * 1024
	defaultMaxConcurrency = 50
	minConcurrency        = 25

	uploadSessionContentType = "application/x-docker-upload-session"
	blobContentType          = "application/octet-stream"

	maxTries = 5
)

var rangeHeader = regexp.MustCompile(`^bytes=([0-9])+-([0-9]+)$`)

var _ storagedriver.FileWriter = &writer{}

// driverParameters is a struct that encapsulates all of the driver parameters after all values have been set
type driverParameters struct {
	bucket        string
	email         string
	privateKey    []byte
	client        *http.Client
	rootDirectory string
	chunkSize     int
	gcs           *storage.Client

	// maxConcurrency limits the number of concurrent driver operations
	// to GCS, which ultimately increases reliability of many simultaneous
	// pushes by ensuring we aren't DoSing our own server with many
	// connections.
	maxConcurrency uint64
}

func init() {
	factory.Register(driverName, &gcsDriverFactory{})
}

// gcsDriverFactory implements the factory.StorageDriverFactory interface
type gcsDriverFactory struct{}

// Create StorageDriver from parameters
func (factory *gcsDriverFactory) Create(ctx context.Context, parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
	return FromParameters(ctx, parameters)
}

var _ storagedriver.StorageDriver = &driver{}

// driver is a storagedriver.StorageDriver implementation backed by GCS
// Objects are stored at absolute keys in the provided bucket.
type driver struct {
	client        *http.Client
	bucket        *storage.BucketHandle
	email         string
	privateKey    []byte
	rootDirectory string
	chunkSize     int
}

// Wrapper wraps `driver` with a throttler, ensuring that no more than N
// GCS actions can occur concurrently. The default limit is 75.
type Wrapper struct {
	baseEmbed
}

type baseEmbed struct {
	base.Base
}

// FromParameters constructs a new Driver with a given parameters map
// Required parameters:
// - bucket
func FromParameters(ctx context.Context, parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
	bucket, ok := parameters["bucket"]
	if !ok || fmt.Sprint(bucket) == "" {
		return nil, fmt.Errorf("No bucket parameter provided")
	}

	rootDirectory, ok := parameters["rootdirectory"]
	if !ok {
		rootDirectory = ""
	}

	chunkSize := defaultChunkSize
	chunkSizeParam, ok := parameters["chunksize"]
	if ok {
		switch v := chunkSizeParam.(type) {
		case string:
			vv, err := strconv.Atoi(v)
			if err != nil {
				return nil, fmt.Errorf("chunksize must be an integer, %v invalid", chunkSizeParam)
			}
			chunkSize = vv
		case int, uint, int32, uint32, uint64, int64:
			chunkSize = int(reflect.ValueOf(v).Convert(reflect.TypeOf(chunkSize)).Int())
		default:
			return nil, fmt.Errorf("invalid valud for chunksize: %#v", chunkSizeParam)
		}

		if chunkSize < minChunkSize {
			return nil, fmt.Errorf("chunksize %#v must be larger than or equal to %d", chunkSize, minChunkSize)
		}

		if chunkSize%minChunkSize != 0 {
			return nil, fmt.Errorf("chunksize should be a multiple of %d", minChunkSize)
		}
	}

	var ts oauth2.TokenSource
	jwtConf := new(jwt.Config)
	var err error
	var gcs *storage.Client
	var options []option.ClientOption
	if keyfile, ok := parameters["keyfile"]; ok {
		jsonKey, err := os.ReadFile(fmt.Sprint(keyfile))
		if err != nil {
			return nil, err
		}
		jwtConf, err = google.JWTConfigFromJSON(jsonKey, storage.ScopeFullControl)
		if err != nil {
			return nil, err
		}
		ts = jwtConf.TokenSource(ctx)
		options = append(options, option.WithCredentialsFile(fmt.Sprint(keyfile)))
	} else if credentials, ok := parameters["credentials"]; ok {
		credentialMap, ok := credentials.(map[interface{}]interface{})
		if !ok {
			return nil, fmt.Errorf("The credentials were not specified in the correct format")
		}

		stringMap := map[string]interface{}{}
		for k, v := range credentialMap {
			key, ok := k.(string)
			if !ok {
				return nil, fmt.Errorf("One of the credential keys was not a string: %s", fmt.Sprint(k))
			}
			stringMap[key] = v
		}

		data, err := json.Marshal(stringMap)
		if err != nil {
			return nil, fmt.Errorf("Failed to marshal gcs credentials to json")
		}

		jwtConf, err = google.JWTConfigFromJSON(data, storage.ScopeFullControl)
		if err != nil {
			return nil, err
		}
		ts = jwtConf.TokenSource(ctx)
		options = append(options, option.WithCredentialsJSON(data))
	} else {
		var err error
		// DefaultTokenSource is a convenience method. It first calls FindDefaultCredentials,
		// then uses the credentials to construct an http.Client or an oauth2.TokenSource.
		// https://pkg.go.dev/golang.org/x/oauth2/google#hdr-Credentials
		ts, err = google.DefaultTokenSource(ctx, storage.ScopeFullControl)
		if err != nil {
			return nil, err
		}
	}

	if userAgent, ok := parameters["useragent"]; ok {
		if ua, ok := userAgent.(string); ok && ua != "" {
			options = append(options, option.WithUserAgent(ua))
		}
	}

	gcs, err = storage.NewClient(ctx, options...)
	if err != nil {
		return nil, err
	}

	maxConcurrency, err := base.GetLimitFromParameter(parameters["maxconcurrency"], minConcurrency, defaultMaxConcurrency)
	if err != nil {
		return nil, fmt.Errorf("maxconcurrency config error: %s", err)
	}

	params := driverParameters{
		bucket:         fmt.Sprint(bucket),
		rootDirectory:  fmt.Sprint(rootDirectory),
		email:          jwtConf.Email,
		privateKey:     jwtConf.PrivateKey,
		client:         oauth2.NewClient(ctx, ts),
		chunkSize:      chunkSize,
		maxConcurrency: maxConcurrency,
		gcs:            gcs,
	}

	return New(ctx, params)
}

// New constructs a new driver
func New(ctx context.Context, params driverParameters) (storagedriver.StorageDriver, error) {
	rootDirectory := strings.Trim(params.rootDirectory, "/")
	if rootDirectory != "" {
		rootDirectory += "/"
	}
	if params.chunkSize <= 0 || params.chunkSize%minChunkSize != 0 {
		return nil, fmt.Errorf("Invalid chunksize: %d is not a positive multiple of %d", params.chunkSize, minChunkSize)
	}
	d := &driver{
		bucket:        params.gcs.Bucket(params.bucket),
		rootDirectory: rootDirectory,
		email:         params.email,
		privateKey:    params.privateKey,
		client:        params.client,
		chunkSize:     params.chunkSize,
	}

	return &Wrapper{
		baseEmbed: baseEmbed{
			Base: base.Base{
				StorageDriver: base.NewRegulator(d, params.maxConcurrency),
			},
		},
	}, nil
}

// Implement the storagedriver.StorageDriver interface

func (d *driver) Name() string {
	return driverName
}

// GetContent retrieves the content stored at "path" as a []byte.
// This should primarily be used for small objects.
func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
	r, err := d.bucket.Object(d.pathToKey(path)).NewReader(ctx)
	if err != nil {
		if err == storage.ErrObjectNotExist {
			return nil, storagedriver.PathNotFoundError{Path: path}
		}
		return nil, err
	}
	defer r.Close()

	return io.ReadAll(r)
}

// PutContent stores the []byte content at a location designated by "path".
// This should primarily be used for small objects.
func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	object := d.bucket.Object(d.pathToKey(path))
	err := d.putContent(ctx, object, contents, blobContentType, nil)
	if err != nil {
		return err
	}
	return nil
}

// Reader retrieves an io.ReadCloser for the content stored at "path"
// with a given byte offset.
// May be used to resume reading a stream by providing a nonzero offset.
func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
	obj := d.bucket.Object(d.pathToKey(path))
	// NOTE(milosgajdos): If length is negative, the object is read until the end
	// See: https://pkg.go.dev/cloud.google.com/go/storage#ObjectHandle.NewRangeReader
	r, err := obj.NewRangeReader(ctx, offset, -1)
	if err != nil {
		if err == storage.ErrObjectNotExist {
			return nil, storagedriver.PathNotFoundError{Path: path}
		}
		var status *googleapi.Error
		if errors.As(err, &status) {
			switch status.Code {
			case http.StatusNotFound:
				return nil, storagedriver.PathNotFoundError{Path: path}
			case http.StatusRequestedRangeNotSatisfiable:
				attrs, err := obj.Attrs(ctx)
				if err != nil {
					return nil, err
				}
				if offset == attrs.Size {
					return io.NopCloser(bytes.NewReader([]byte{})), nil
				}
				return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
			}
		}
		return nil, err
	}
	if r.Attrs.ContentType == uploadSessionContentType {
		r.Close()
		return nil, storagedriver.PathNotFoundError{Path: path}
	}
	return r, nil
}

// Writer returns a FileWriter which will store the content written to it
// at the location designated by "path" after the call to Commit.
func (d *driver) Writer(ctx context.Context, path string, appendMode bool) (storagedriver.FileWriter, error) {
	w := &writer{
		ctx:    ctx,
		driver: d,
		object: d.bucket.Object(d.pathToKey(path)),
		buffer: make([]byte, d.chunkSize),
	}

	if appendMode {
		err := w.init(ctx)
		if err != nil {
			return nil, err
		}
	}
	return w, nil
}

type writer struct {
	ctx        context.Context
	object     *storage.ObjectHandle
	driver     *driver
	size       int64
	offset     int64
	closed     bool
	cancelled  bool
	committed  bool
	sessionURI string
	buffer     []byte
	buffSize   int
}

// Cancel removes any written content from this FileWriter.
func (w *writer) Cancel(ctx context.Context) error {
	w.closed = true
	w.cancelled = true

	err := w.object.Delete(ctx)
	if err != nil {
		if err == storage.ErrObjectNotExist {
			err = nil
		}
	}
	return err
}

func (w *writer) Close() error {
	if w.closed {
		return nil
	}
	w.closed = true

	err := w.writeChunk(w.ctx)
	if err != nil {
		return err
	}

	// Copy the remaining bytes from the buffer to the upload session
	// Normally buffSize will be smaller than minChunkSize. However, in the
	// unlikely event that the upload session failed to start, this number could be higher.
	// In this case we can safely clip the remaining bytes to the minChunkSize
	if w.buffSize > minChunkSize {
		w.buffSize = minChunkSize
	}

	// commit the writes by updating the upload session
	metadata := map[string]string{
		"Session-URI": w.sessionURI,
		"Offset":      strconv.FormatInt(w.offset, 10),
	}
	err = retry(func() error {
		err := w.driver.putContent(w.ctx, w.object, w.buffer[0:w.buffSize], uploadSessionContentType, metadata)
		if err != nil {
			return err
		}
		w.size = w.offset + int64(w.buffSize)
		w.buffSize = 0
		return nil
	})
	return err
}

func (d *driver) putContent(ctx context.Context, obj *storage.ObjectHandle, content []byte, contentType string, metadata map[string]string) error {
	wc := obj.NewWriter(ctx)
	wc.Metadata = metadata
	wc.ContentType = contentType
	wc.ChunkSize = d.chunkSize

	if _, err := bytes.NewReader(content).WriteTo(wc); err != nil {
		return err
	}
	return wc.Close()
}

// Commit flushes all content written to this FileWriter and makes it
// available for future calls to StorageDriver.GetContent and
// StorageDriver.Reader.
func (w *writer) Commit(ctx context.Context) error {
	if w.closed {
		return fmt.Errorf("already closed")
	}
	w.closed = true

	// no session started yet just perform a simple upload
	if w.sessionURI == "" {
		err := retry(func() error {
			err := w.driver.putContent(ctx, w.object, w.buffer[0:w.buffSize], blobContentType, nil)
			if err != nil {
				return err
			}
			w.committed = true
			w.size = w.offset + int64(w.buffSize)
			w.buffSize = 0
			return nil
		})
		return err
	}
	size := w.offset + int64(w.buffSize)
	var written int
	// loop must be performed at least once to ensure the file is committed even when
	// the buffer is empty
	for {
		n, err := w.putChunk(ctx, w.sessionURI, w.buffer[written:w.buffSize], w.offset, size)
		written += int(n)
		w.offset += n
		w.size = w.offset
		if err != nil {
			w.buffSize = copy(w.buffer, w.buffer[written:w.buffSize])
			return err
		}
		if written == w.buffSize {
			break
		}
	}
	w.committed = true
	w.buffSize = 0
	return nil
}

func (w *writer) writeChunk(ctx context.Context) error {
	var err error
	// chunks can be uploaded only in multiples of minChunkSize
	// chunkSize is a multiple of minChunkSize less than or equal to buffSize
	chunkSize := w.buffSize - (w.buffSize % minChunkSize)
	if chunkSize == 0 {
		return nil
	}
	// if their is no sessionURI yet, obtain one by starting the session
	if w.sessionURI == "" {
		w.sessionURI, err = w.newSession()
	}
	if err != nil {
		return err
	}
	n, err := w.putChunk(ctx, w.sessionURI, w.buffer[0:chunkSize], w.offset, -1)
	w.offset += n
	if w.offset > w.size {
		w.size = w.offset
	}
	// shift the remaining bytes to the start of the buffer
	w.buffSize = copy(w.buffer, w.buffer[int(n):w.buffSize])

	return err
}

func (w *writer) Write(p []byte) (int, error) {
	if w.closed {
		return 0, fmt.Errorf("already closed")
	} else if w.cancelled {
		return 0, fmt.Errorf("already cancelled")
	}

	var (
		written int
		err     error
	)

	for written < len(p) {
		n := copy(w.buffer[w.buffSize:], p[written:])
		w.buffSize += n
		if w.buffSize == cap(w.buffer) {
			err = w.writeChunk(w.ctx)
			if err != nil {
				break
			}
		}
		written += n
	}
	w.size = w.offset + int64(w.buffSize)
	return written, err
}

// Size returns the number of bytes written to this FileWriter.
func (w *writer) Size() int64 {
	return w.size
}

func (w *writer) init(ctx context.Context) error {
	attrs, err := w.object.Attrs(ctx)
	if err != nil {
		return err
	}

	// NOTE(milosgajdos): when PUSH abruptly finishes by
	// calling a single commit and then closes the stream
	// attrs.ContentType ends up being set to application/octet-stream
	// We must handle this case so the upload can resume.
	if attrs.ContentType != uploadSessionContentType &&
		attrs.ContentType != blobContentType {
		return storagedriver.PathNotFoundError{Path: w.object.ObjectName()}
	}

	offset := int64(0)
	// NOTE(milosgajdos): if a client creates an empty blob, then
	// closes the stream and then attempts to append to it, the offset
	// will be empty, in which case strconv.ParseInt will return error
	// See: https://pkg.go.dev/strconv#ParseInt
	if attrs.Metadata["Offset"] != "" {
		offset, err = strconv.ParseInt(attrs.Metadata["Offset"], 10, 64)
		if err != nil {
			return err
		}
	}

	r, err := w.object.NewReader(ctx)
	if err != nil {
		return err
	}
	defer r.Close()

	for err == nil && w.buffSize < len(w.buffer) {
		var n int
		n, err = r.Read(w.buffer[w.buffSize:])
		w.buffSize += n
	}
	if err != nil && err != io.EOF {
		return err
	}

	// NOTE(milosgajdos): if a client closes an existing session and then attempts
	// to append to an existing blob, the session will be empty; recreate it
	if w.sessionURI = attrs.Metadata["Session-URI"]; w.sessionURI == "" {
		w.sessionURI, err = w.newSession()
		if err != nil {
			return err
		}
	}
	w.offset = offset
	w.size = offset + int64(w.buffSize)
	return nil
}

type request func() error

func retry(req request) error {
	backoff := time.Second
	var err error
	for i := 0; i < maxTries; i++ {
		err = req()
		if err == nil {
			return nil
		}

		status, ok := err.(*googleapi.Error)
		if !ok || (status.Code != http.StatusTooManyRequests && status.Code < http.StatusInternalServerError) {
			return err
		}

		time.Sleep(backoff - time.Second + (time.Duration(rand.Int31n(1000)) * time.Millisecond))
		if i <= 4 {
			backoff = backoff * 2
		}
	}
	return err
}

// Stat retrieves the FileInfo for the given path, including the current
// size in bytes and the creation time.
func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
	var fi storagedriver.FileInfoFields
	// try to get as file
	obj, err := d.bucket.Object(d.pathToKey(path)).Attrs(ctx)
	if err == nil {
		if obj.ContentType == uploadSessionContentType {
			return nil, storagedriver.PathNotFoundError{Path: path}
		}
		fi = storagedriver.FileInfoFields{
			Path:    path,
			Size:    obj.Size,
			ModTime: obj.Updated,
			IsDir:   false,
		}
		return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
	}
	// try to get as folder
	dirpath := d.pathToDirKey(path)

	query := &storage.Query{
		Prefix: dirpath,
	}

	obj, err = d.bucket.Objects(ctx, query).Next()
	if err != nil {
		if err == iterator.Done {
			return nil, storagedriver.PathNotFoundError{Path: path}
		}
		return nil, err
	}

	fi = storagedriver.FileInfoFields{
		Path:  path,
		IsDir: true,
	}

	if obj.Name == dirpath {
		fi.Size = obj.Size
		fi.ModTime = obj.Updated
	}
	return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
}

// List returns a list of the objects that are direct descendants of the
// given path.
func (d *driver) List(ctx context.Context, path string) ([]string, error) {
	query := &storage.Query{
		Delimiter: "/",
		Prefix:    d.pathToDirKey(path),
	}
	objects := d.bucket.Objects(ctx, query)

	list := make([]string, 0, 64)
	for {
		object, err := objects.Next()
		if err != nil {
			if err == iterator.Done {
				break
			}
			return nil, err
		}
		// GCS does not guarantee strong consistency between
		// DELETE and LIST operations. Check that the object is not deleted,
		// and filter out any objects with a non-zero time-deleted
		if object.Deleted.IsZero() && object.ContentType != uploadSessionContentType && object.Name != "" {
			list = append(list, d.keyToPath(object.Name))
		}

		if object.Name == "" && object.Prefix != "" {
			subpath := d.keyToPath(object.Prefix)
			list = append(list, subpath)
		}
	}

	if path != "/" && len(list) == 0 {
		// Treat empty response as missing directory, since we don't actually
		// have directories in Google Cloud Storage.
		return nil, storagedriver.PathNotFoundError{Path: path}
	}
	return list, nil
}

// Move moves an object stored at sourcePath to destPath, removing the
// original object.
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
	srcKey, dstKey := d.pathToKey(sourcePath), d.pathToKey(destPath)
	src := d.bucket.Object(srcKey)
	_, err := d.bucket.Object(dstKey).CopierFrom(src).Run(ctx)
	if err != nil {
		var status *googleapi.Error
		if errors.As(err, &status) {
			if status.Code == http.StatusNotFound {
				return storagedriver.PathNotFoundError{Path: srcKey}
			}
		}
		return fmt.Errorf("move %q to %q: %v", srcKey, dstKey, err)
	}
	err = src.Delete(ctx)
	// if deleting the file fails, log the error, but do not fail; the file was successfully copied,
	// and the original should eventually be cleaned when purging the uploads folder.
	if err != nil {
		logrus.Infof("error deleting %v: %v", sourcePath, err)
	}
	return nil
}

// listAll recursively lists all names of objects stored at "prefix" and its subpaths.
func (d *driver) listAll(ctx context.Context, prefix string) ([]string, error) {
	objects := d.bucket.Objects(ctx, &storage.Query{
		Prefix:   prefix,
		Versions: false,
	})

	list := make([]string, 0, 64)
	for {
		object, err := objects.Next()
		if err != nil {
			if err == iterator.Done {
				break
			}
			return nil, err
		}
		// GCS does not guarantee strong consistency between
		// DELETE and LIST operations. Check that the object is not deleted,
		// and filter out any objects with a non-zero time-deleted
		if object.Deleted.IsZero() {
			list = append(list, object.Name)
		}
	}

	return list, nil
}

// Delete recursively deletes all objects stored at "path" and its subpaths.
func (d *driver) Delete(ctx context.Context, path string) error {
	prefix := d.pathToDirKey(path)
	keys, err := d.listAll(ctx, prefix)
	if err != nil {
		return err
	}
	if len(keys) > 0 {
		// NOTE(milosgajdos): d.listAll calls (BucketHandle).Objects
		// See: https://pkg.go.dev/cloud.google.com/go/storage#BucketHandle.Objects
		// docs: Objects will be iterated over lexicographically by name.
		// This means we don't have to reverse order the slice; we can
		// range over the keys slice in reverse order
		for i := len(keys) - 1; i >= 0; i-- {
			key := keys[i]
			err := d.bucket.Object(key).Delete(ctx)
			// GCS only guarantees eventual consistency, so listAll might return
			// paths that no longer exist. If this happens, just ignore any not
			// found error
			if status, ok := err.(*googleapi.Error); ok {
				if status.Code == http.StatusNotFound {
					err = nil
				}
			}
			if err != nil {
				return err
			}
		}
		return nil
	}
	err = d.bucket.Object(d.pathToKey(path)).Delete(ctx)
	if err == storage.ErrObjectNotExist {
		return storagedriver.PathNotFoundError{Path: path}
	}
	return err
}

// RedirectURL returns a URL which may be used to retrieve the content stored at
// the given path, possibly using the given options.
func (d *driver) RedirectURL(r *http.Request, path string) (string, error) {
	if r.Method != http.MethodGet && r.Method != http.MethodHead {
		return "", nil
	}

	opts := &storage.SignedURLOptions{
		GoogleAccessID: d.email,
		PrivateKey:     d.privateKey,
		Method:         r.Method,
		Expires:        time.Now().Add(20 * time.Minute),
	}
	return d.bucket.SignedURL(d.pathToKey(path), opts)
}

// Walk traverses a filesystem defined within driver, starting
// from the given path, calling f on each file
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn, options ...func(*storagedriver.WalkOptions)) error {
	return storagedriver.WalkFallback(ctx, d, path, f, options...)
}

func (w *writer) newSession() (uri string, err error) {
	u := &url.URL{
		Scheme:   "https",
		Host:     "www.googleapis.com",
		Path:     fmt.Sprintf("/upload/storage/v1/b/%v/o", w.object.BucketName()),
		RawQuery: fmt.Sprintf("uploadType=resumable&name=%v", w.object.ObjectName()),
	}
	req, err := http.NewRequestWithContext(w.ctx, http.MethodPost, u.String(), nil)
	if err != nil {
		return "", err
	}
	req.Header.Set("X-Upload-Content-Type", blobContentType)
	req.Header.Set("Content-Length", "0")

	err = retry(func() error {
		resp, err := w.driver.client.Do(req)
		if err != nil {
			return err
		}
		defer resp.Body.Close()
		err = googleapi.CheckMediaResponse(resp)
		if err != nil {
			return err
		}
		uri = resp.Header.Get("Location")
		return nil
	})
	return uri, err
}

func (w *writer) putChunk(ctx context.Context, sessionURI string, chunk []byte, from int64, totalSize int64) (int64, error) {
	req, err := http.NewRequestWithContext(ctx, http.MethodPut, sessionURI, bytes.NewReader(chunk))
	if err != nil {
		return 0, err
	}
	length := int64(len(chunk))
	to := from + length - 1
	size := "*"
	if totalSize >= 0 {
		size = strconv.FormatInt(totalSize, 10)
	}
	req.Header.Set("Content-Type", blobContentType)
	if from == to+1 {
		req.Header.Set("Content-Range", fmt.Sprintf("bytes */%s", size))
	} else {
		req.Header.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%s", from, to, size))
	}
	req.Header.Set("Content-Length", strconv.FormatInt(length, 10))

	bytesPut := int64(0)
	err = retry(func() error {
		resp, err := w.driver.client.Do(req)
		if err != nil {
			return err
		}
		defer resp.Body.Close()
		if totalSize < 0 && resp.StatusCode == http.StatusPermanentRedirect {
			groups := rangeHeader.FindStringSubmatch(resp.Header.Get("Range"))
			end, err := strconv.ParseInt(groups[2], 10, 64)
			if err != nil {
				return err
			}
			bytesPut = end - from + 1
			return nil
		}
		err = googleapi.CheckMediaResponse(resp)
		if err != nil {
			return err
		}
		bytesPut = to - from + 1
		return nil
	})
	return bytesPut, err
}

func (d *driver) pathToKey(path string) string {
	return strings.TrimSpace(strings.TrimRight(d.rootDirectory+strings.TrimLeft(path, "/"), "/"))
}

func (d *driver) pathToDirKey(path string) string {
	return d.pathToKey(path) + "/"
}

func (d *driver) keyToPath(key string) string {
	return "/" + strings.Trim(strings.TrimPrefix(key, d.rootDirectory), "/")
}