distribution/registry/storage/driver/gcs/gcs.go

629 lines
17 KiB
Go

// Package gcs provides a storagedriver.StorageDriver implementation to
// store blobs in Google cloud storage.
//
// This package leverages the google.golang.org/cloud/storage client library
//for interfacing with gcs.
//
// Because gcs is a key, value store the Stat call does not support last modification
// time for directories (directories are an abstraction for key, value stores)
//
// Keep in mind that gcs guarantees only eventual consistency, so do not assume
// that a successful write will mean immediate access to the data written (although
// in most regions a new object put has guaranteed read after write). The only true
// guarantee is that once you call Stat and receive a certain file size, that much of
// the file is already accessible.
//
// +build include_gcs
package gcs
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"math/rand"
"net/http"
"net/url"
"sort"
"strings"
"time"
"golang.org/x/net/context"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/api/googleapi"
storageapi "google.golang.org/api/storage/v1"
"google.golang.org/cloud"
"google.golang.org/cloud/storage"
ctx "github.com/docker/distribution/context"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/base"
"github.com/docker/distribution/registry/storage/driver/factory"
)
const driverName = "gcs"
const dummyProjectID = "<unknown>"
//DriverParameters A struct that encapsulates all of the driver parameters after all values have been set
type driverParameters struct {
bucket string
keyfile string
rootDirectory string
}
func init() {
factory.Register(driverName, &gcsDriverFactory{})
}
// gcsDriverFactory implements the factory.StorageDriverFactory interface
type gcsDriverFactory struct{}
// Create StorageDriver from parameters
func (factory *gcsDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
return FromParameters(parameters)
}
// driver is a storagedriver.StorageDriver implementation backed by GCS
// Objects are stored at absolute keys in the provided bucket.
type driver struct {
client *http.Client
bucket string
email string
privateKey []byte
rootDirectory string
}
// FromParameters constructs a new Driver with a given parameters map
// Required parameters:
// - bucket
func FromParameters(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
bucket, ok := parameters["bucket"]
if !ok || fmt.Sprint(bucket) == "" {
return nil, fmt.Errorf("No bucket parameter provided")
}
keyfile, ok := parameters["keyfile"]
if !ok {
keyfile = ""
}
rootDirectory, ok := parameters["rootdirectory"]
if !ok {
rootDirectory = ""
}
params := driverParameters{
fmt.Sprint(bucket),
fmt.Sprint(keyfile),
fmt.Sprint(rootDirectory),
}
return New(params)
}
// New constructs a new driver
func New(params driverParameters) (storagedriver.StorageDriver, error) {
var ts oauth2.TokenSource
var err error
rootDirectory := strings.Trim(params.rootDirectory, "/")
if rootDirectory != "" {
rootDirectory += "/"
}
d := &driver{
bucket: params.bucket,
rootDirectory: rootDirectory,
}
if params.keyfile == "" {
ts, err = google.DefaultTokenSource(context.Background(), storage.ScopeFullControl)
if err != nil {
return nil, err
}
} else {
jsonKey, err := ioutil.ReadFile(params.keyfile)
if err != nil {
return nil, err
}
conf, err := google.JWTConfigFromJSON(
jsonKey,
storage.ScopeFullControl,
)
if err != nil {
return nil, err
}
ts = conf.TokenSource(context.Background())
d.email = conf.Email
d.privateKey = conf.PrivateKey
}
client := oauth2.NewClient(context.Background(), ts)
d.client = client
if err != nil {
return nil, err
}
return &base.Base{
StorageDriver: d,
}, nil
}
// Implement the storagedriver.StorageDriver interface
func (d *driver) Name() string {
return driverName
}
// GetContent retrieves the content stored at "path" as a []byte.
// This should primarily be used for small objects.
func (d *driver) GetContent(context ctx.Context, path string) ([]byte, error) {
rc, err := d.ReadStream(context, path, 0)
if err != nil {
return nil, err
}
defer rc.Close()
p, err := ioutil.ReadAll(rc)
if err != nil {
return nil, err
}
return p, nil
}
// PutContent stores the []byte content at a location designated by "path".
// This should primarily be used for small objects.
func (d *driver) PutContent(context ctx.Context, path string, contents []byte) error {
wc := storage.NewWriter(d.context(context), d.bucket, d.pathToKey(path))
wc.ContentType = "application/octet-stream"
defer wc.Close()
_, err := wc.Write(contents)
return err
}
// ReadStream retrieves an io.ReadCloser for the content stored at "path"
// with a given byte offset.
// May be used to resume reading a stream by providing a nonzero offset.
func (d *driver) ReadStream(context ctx.Context, path string, offset int64) (io.ReadCloser, error) {
name := d.pathToKey(path)
// copied from google.golang.org/cloud/storage#NewReader :
// to set the additional "Range" header
u := &url.URL{
Scheme: "https",
Host: "storage.googleapis.com",
Path: fmt.Sprintf("/%s/%s", d.bucket, name),
}
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, err
}
if offset > 0 {
req.Header.Set("Range", fmt.Sprintf("bytes=%v-", offset))
}
res, err := d.client.Do(req)
if err != nil {
return nil, err
}
if res.StatusCode == http.StatusNotFound {
res.Body.Close()
return nil, storagedriver.PathNotFoundError{Path: path}
}
if res.StatusCode == http.StatusRequestedRangeNotSatisfiable {
res.Body.Close()
obj, err := storage.StatObject(d.context(context), d.bucket, name)
if err != nil {
return nil, err
}
if offset == int64(obj.Size) {
return ioutil.NopCloser(bytes.NewReader([]byte{})), nil
}
return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
}
if res.StatusCode < 200 || res.StatusCode > 299 {
res.Body.Close()
return nil, fmt.Errorf("storage: can't read object %v/%v, status code: %v", d.bucket, name, res.Status)
}
return res.Body, nil
}
// WriteStream stores the contents of the provided io.ReadCloser at a
// location designated by the given path.
// May be used to resume writing a stream by providing a nonzero offset.
// The offset must be no larger than the CurrentSize for this path.
func (d *driver) WriteStream(context ctx.Context, path string, offset int64, reader io.Reader) (totalRead int64, err error) {
if offset < 0 {
return 0, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
}
if offset == 0 {
return d.writeCompletely(context, path, 0, reader)
}
service, err := storageapi.New(d.client)
if err != nil {
return 0, err
}
objService := storageapi.NewObjectsService(service)
var obj *storageapi.Object
err = retry(5, func() error {
o, err := objService.Get(d.bucket, d.pathToKey(path)).Do()
obj = o
return err
})
// obj, err := retry(5, objService.Get(d.bucket, d.pathToKey(path)).Do)
if err != nil {
return 0, err
}
// cannot append more chunks, so redo from scratch
if obj.ComponentCount >= 1023 {
return d.writeCompletely(context, path, offset, reader)
}
// skip from reader
objSize := int64(obj.Size)
nn, err := skip(reader, objSize-offset)
if err != nil {
return nn, err
}
// Size <= offset
partName := fmt.Sprintf("%v#part-%d#", d.pathToKey(path), obj.ComponentCount)
gcsContext := d.context(context)
wc := storage.NewWriter(gcsContext, d.bucket, partName)
wc.ContentType = "application/octet-stream"
if objSize < offset {
err = writeZeros(wc, offset-objSize)
if err != nil {
wc.CloseWithError(err)
return nn, err
}
}
n, err := io.Copy(wc, reader)
if err != nil {
wc.CloseWithError(err)
return nn, err
}
err = wc.Close()
if err != nil {
return nn, err
}
// wc was closed succesfully, so the temporary part exists, schedule it for deletion at the end
// of the function
defer storage.DeleteObject(gcsContext, d.bucket, partName)
req := &storageapi.ComposeRequest{
Destination: &storageapi.Object{Bucket: obj.Bucket, Name: obj.Name, ContentType: obj.ContentType},
SourceObjects: []*storageapi.ComposeRequestSourceObjects{
{
Name: obj.Name,
Generation: obj.Generation,
}, {
Name: partName,
Generation: wc.Object().Generation,
}},
}
err = retry(5, func() error { _, err := objService.Compose(d.bucket, obj.Name, req).Do(); return err })
if err == nil {
nn = nn + n
}
return nn, err
}
type request func() error
func retry(maxTries int, req request) error {
backoff := time.Second
var err error
for i := 0; i < maxTries; i++ {
err = req()
if err == nil {
return nil
}
status, ok := err.(*googleapi.Error)
if !ok || (status.Code != 429 && status.Code < http.StatusInternalServerError) {
return err
}
time.Sleep(backoff - time.Second + (time.Duration(rand.Int31n(1000)) * time.Millisecond))
if i <= 4 {
backoff = backoff * 2
}
}
return err
}
func (d *driver) writeCompletely(context ctx.Context, path string, offset int64, reader io.Reader) (totalRead int64, err error) {
wc := storage.NewWriter(d.context(context), d.bucket, d.pathToKey(path))
wc.ContentType = "application/octet-stream"
defer wc.Close()
// Copy the first offset bytes of the existing contents
// (padded with zeros if needed) into the writer
if offset > 0 {
existing, err := d.ReadStream(context, path, 0)
if err != nil {
return 0, err
}
defer existing.Close()
n, err := io.CopyN(wc, existing, offset)
if err == io.EOF {
err = writeZeros(wc, offset-n)
}
if err != nil {
return 0, err
}
}
return io.Copy(wc, reader)
}
func skip(reader io.Reader, count int64) (int64, error) {
if count <= 0 {
return 0, nil
}
return io.CopyN(ioutil.Discard, reader, count)
}
func writeZeros(wc io.Writer, count int64) error {
buf := make([]byte, 32*1024)
for count > 0 {
size := cap(buf)
if int64(size) > count {
size = int(count)
}
n, err := wc.Write(buf[0:size])
if err != nil {
return err
}
count = count - int64(n)
}
return nil
}
// Stat retrieves the FileInfo for the given path, including the current
// size in bytes and the creation time.
func (d *driver) Stat(context ctx.Context, path string) (storagedriver.FileInfo, error) {
var fi storagedriver.FileInfoFields
//try to get as file
gcsContext := d.context(context)
obj, err := storage.StatObject(gcsContext, d.bucket, d.pathToKey(path))
if err == nil {
fi = storagedriver.FileInfoFields{
Path: path,
Size: obj.Size,
ModTime: obj.Updated,
IsDir: false,
}
return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
}
//try to get as folder
dirpath := d.pathToDirKey(path)
var query *storage.Query
query = &storage.Query{}
query.Prefix = dirpath
query.MaxResults = 1
objects, err := storage.ListObjects(gcsContext, d.bucket, query)
if err != nil {
return nil, err
}
if len(objects.Results) < 1 {
return nil, storagedriver.PathNotFoundError{Path: path}
}
fi = storagedriver.FileInfoFields{
Path: path,
IsDir: true,
}
obj = objects.Results[0]
if obj.Name == dirpath {
fi.Size = obj.Size
fi.ModTime = obj.Updated
}
return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
}
// List returns a list of the objects that are direct descendants of the
//given path.
func (d *driver) List(context ctx.Context, path string) ([]string, error) {
var query *storage.Query
query = &storage.Query{}
query.Delimiter = "/"
query.Prefix = d.pathToDirKey(path)
list := make([]string, 0, 64)
for {
objects, err := storage.ListObjects(d.context(context), d.bucket, query)
if err != nil {
return nil, err
}
for _, object := range objects.Results {
// GCS does not guarantee strong consistency between
// DELETE and LIST operationsCheck that the object is not deleted,
// so filter out any objects with a non-zero time-deleted
if object.Deleted.IsZero() {
name := object.Name
// Ignore objects with names that end with '#' (these are uploaded parts)
if name[len(name)-1] != '#' {
name = d.keyToPath(name)
list = append(list, name)
}
}
}
for _, subpath := range objects.Prefixes {
subpath = d.keyToPath(subpath)
list = append(list, subpath)
}
query = objects.Next
if query == nil {
break
}
}
if path != "/" && len(list) == 0 {
// Treat empty response as missing directory, since we don't actually
// have directories in Google Cloud Storage.
return nil, storagedriver.PathNotFoundError{Path: path}
}
return list, nil
}
// Move moves an object stored at sourcePath to destPath, removing the
// original object.
func (d *driver) Move(context ctx.Context, sourcePath string, destPath string) error {
prefix := d.pathToDirKey(sourcePath)
gcsContext := d.context(context)
keys, err := d.listAll(gcsContext, prefix)
if err != nil {
return err
}
if len(keys) > 0 {
destPrefix := d.pathToDirKey(destPath)
copies := make([]string, 0, len(keys))
sort.Strings(keys)
var err error
for _, key := range keys {
dest := destPrefix + key[len(prefix):]
_, err = storage.CopyObject(gcsContext, d.bucket, key, d.bucket, dest, nil)
if err == nil {
copies = append(copies, dest)
} else {
break
}
}
// if an error occurred, attempt to cleanup the copies made
if err != nil {
for i := len(copies) - 1; i >= 0; i-- {
_ = storage.DeleteObject(gcsContext, d.bucket, copies[i])
}
return err
}
// delete originals
for i := len(keys) - 1; i >= 0; i-- {
err2 := storage.DeleteObject(gcsContext, d.bucket, keys[i])
if err2 != nil {
err = err2
}
}
return err
}
_, err = storage.CopyObject(gcsContext, d.bucket, d.pathToKey(sourcePath), d.bucket, d.pathToKey(destPath), nil)
if err != nil {
if status := err.(*googleapi.Error); status != nil {
if status.Code == http.StatusNotFound {
return storagedriver.PathNotFoundError{Path: sourcePath}
}
}
return err
}
return storage.DeleteObject(gcsContext, d.bucket, d.pathToKey(sourcePath))
}
// listAll recursively lists all names of objects stored at "prefix" and its subpaths.
func (d *driver) listAll(context context.Context, prefix string) ([]string, error) {
list := make([]string, 0, 64)
query := &storage.Query{}
query.Prefix = prefix
query.Versions = false
for {
objects, err := storage.ListObjects(d.context(context), d.bucket, query)
if err != nil {
return nil, err
}
for _, obj := range objects.Results {
// GCS does not guarantee strong consistency between
// DELETE and LIST operationsCheck that the object is not deleted,
// so filter out any objects with a non-zero time-deleted
if obj.Deleted.IsZero() {
list = append(list, obj.Name)
}
}
query = objects.Next
if query == nil {
break
}
}
return list, nil
}
// Delete recursively deletes all objects stored at "path" and its subpaths.
func (d *driver) Delete(context ctx.Context, path string) error {
prefix := d.pathToDirKey(path)
gcsContext := d.context(context)
keys, err := d.listAll(gcsContext, prefix)
if err != nil {
return err
}
if len(keys) > 0 {
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
for _, key := range keys {
if err := storage.DeleteObject(gcsContext, d.bucket, key); err != nil {
return err
}
}
return nil
}
err = storage.DeleteObject(gcsContext, d.bucket, d.pathToKey(path))
if err != nil {
if status := err.(*googleapi.Error); status != nil {
if status.Code == http.StatusNotFound {
return storagedriver.PathNotFoundError{Path: path}
}
}
}
return err
}
// URLFor returns a URL which may be used to retrieve the content stored at
// the given path, possibly using the given options.
// Returns ErrUnsupportedMethod if this driver has no privateKey
func (d *driver) URLFor(context ctx.Context, path string, options map[string]interface{}) (string, error) {
if d.privateKey == nil {
return "", storagedriver.ErrUnsupportedMethod{}
}
name := d.pathToKey(path)
methodString := "GET"
method, ok := options["method"]
if ok {
methodString, ok = method.(string)
if !ok || (methodString != "GET" && methodString != "HEAD") {
return "", storagedriver.ErrUnsupportedMethod{}
}
}
expiresTime := time.Now().Add(20 * time.Minute)
expires, ok := options["expiry"]
if ok {
et, ok := expires.(time.Time)
if ok {
expiresTime = et
}
}
opts := &storage.SignedURLOptions{
GoogleAccessID: d.email,
PrivateKey: d.privateKey,
Method: methodString,
Expires: expiresTime,
}
return storage.SignedURL(d.bucket, name, opts)
}
func (d *driver) context(context ctx.Context) context.Context {
return cloud.WithContext(context, dummyProjectID, d.client)
}
func (d *driver) pathToKey(path string) string {
return strings.TrimRight(d.rootDirectory+strings.TrimLeft(path, "/"), "/")
}
func (d *driver) pathToDirKey(path string) string {
return d.pathToKey(path) + "/"
}
func (d *driver) keyToPath(key string) string {
return "/" + strings.Trim(strings.TrimPrefix(key, d.rootDirectory), "/")
}