forked from TrueCloudLab/distribution
Merge pull request #2631 from whoshuu/feature/improve-gcs-driver
Improve gcs driver
This commit is contained in:
commit
642075f42c
5 changed files with 162 additions and 38 deletions
|
@ -100,6 +100,17 @@ storage:
|
||||||
gcs:
|
gcs:
|
||||||
bucket: bucketname
|
bucket: bucketname
|
||||||
keyfile: /path/to/keyfile
|
keyfile: /path/to/keyfile
|
||||||
|
credentials:
|
||||||
|
type: service_account
|
||||||
|
project_id: project_id_string
|
||||||
|
private_key_id: private_key_id_string
|
||||||
|
private_key: private_key_string
|
||||||
|
client_email: client@example.com
|
||||||
|
client_id: client_id_string
|
||||||
|
auth_uri: http://example.com/auth_uri
|
||||||
|
token_uri: http://example.com/token_uri
|
||||||
|
auth_provider_x509_cert_url: http://example.com/provider_cert_url
|
||||||
|
client_x509_cert_url: http://example.com/client_cert_url
|
||||||
rootdirectory: /gcs/object/name/prefix
|
rootdirectory: /gcs/object/name/prefix
|
||||||
chunksize: 5242880
|
chunksize: 5242880
|
||||||
s3:
|
s3:
|
||||||
|
@ -389,6 +400,17 @@ storage:
|
||||||
gcs:
|
gcs:
|
||||||
bucket: bucketname
|
bucket: bucketname
|
||||||
keyfile: /path/to/keyfile
|
keyfile: /path/to/keyfile
|
||||||
|
credentials:
|
||||||
|
type: service_account
|
||||||
|
project_id: project_id_string
|
||||||
|
private_key_id: private_key_id_string
|
||||||
|
private_key: private_key_string
|
||||||
|
client_email: client@example.com
|
||||||
|
client_id: client_id_string
|
||||||
|
auth_uri: http://example.com/auth_uri
|
||||||
|
token_uri: http://example.com/token_uri
|
||||||
|
auth_provider_x509_cert_url: http://example.com/provider_cert_url
|
||||||
|
client_x509_cert_url: http://example.com/client_cert_url
|
||||||
rootdirectory: /gcs/object/name/prefix
|
rootdirectory: /gcs/object/name/prefix
|
||||||
s3:
|
s3:
|
||||||
accesskey: awsaccesskey
|
accesskey: awsaccesskey
|
||||||
|
|
|
@ -2,7 +2,10 @@ package base
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"reflect"
|
||||||
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
||||||
|
@ -15,6 +18,46 @@ type regulator struct {
|
||||||
available uint64
|
available uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetLimitFromParameter takes an interface type as decoded from the YAML
|
||||||
|
// configuration and returns a uint64 representing the maximum number of
|
||||||
|
// concurrent calls given a minimum limit and default.
|
||||||
|
//
|
||||||
|
// If the parameter supplied is of an invalid type this returns an error.
|
||||||
|
func GetLimitFromParameter(param interface{}, min, def uint64) (uint64, error) {
|
||||||
|
limit := def
|
||||||
|
|
||||||
|
switch v := param.(type) {
|
||||||
|
case string:
|
||||||
|
var err error
|
||||||
|
if limit, err = strconv.ParseUint(v, 0, 64); err != nil {
|
||||||
|
return limit, fmt.Errorf("parameter must be an integer, '%v' invalid", param)
|
||||||
|
}
|
||||||
|
case uint64:
|
||||||
|
limit = v
|
||||||
|
case int, int32, int64:
|
||||||
|
val := reflect.ValueOf(v).Convert(reflect.TypeOf(param)).Int()
|
||||||
|
// if param is negative casting to uint64 will wrap around and
|
||||||
|
// give you the hugest thread limit ever. Let's be sensible, here
|
||||||
|
if val > 0 {
|
||||||
|
limit = uint64(val)
|
||||||
|
} else {
|
||||||
|
limit = min
|
||||||
|
}
|
||||||
|
case uint, uint32:
|
||||||
|
limit = reflect.ValueOf(v).Convert(reflect.TypeOf(param)).Uint()
|
||||||
|
case nil:
|
||||||
|
// use the default
|
||||||
|
default:
|
||||||
|
return 0, fmt.Errorf("invalid value '%#v'", param)
|
||||||
|
}
|
||||||
|
|
||||||
|
if limit < min {
|
||||||
|
return min, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return limit, nil
|
||||||
|
}
|
||||||
|
|
||||||
// NewRegulator wraps the given driver and is used to regulate concurrent calls
|
// NewRegulator wraps the given driver and is used to regulate concurrent calls
|
||||||
// to the given storage driver to a maximum of the given limit. This is useful
|
// to the given storage driver to a maximum of the given limit. This is useful
|
||||||
// for storage drivers that would otherwise create an unbounded number of OS
|
// for storage drivers that would otherwise create an unbounded number of OS
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package base
|
package base
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -65,3 +66,33 @@ func TestRegulatorEnterExit(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGetLimitFromParameter(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
Input interface{}
|
||||||
|
Expected uint64
|
||||||
|
Min uint64
|
||||||
|
Default uint64
|
||||||
|
Err error
|
||||||
|
}{
|
||||||
|
{"foo", 0, 5, 5, fmt.Errorf("parameter must be an integer, 'foo' invalid")},
|
||||||
|
{"50", 50, 5, 5, nil},
|
||||||
|
{"5", 25, 25, 50, nil}, // lower than Min returns Min
|
||||||
|
{nil, 50, 25, 50, nil}, // nil returns default
|
||||||
|
{812, 812, 25, 50, nil},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, item := range tests {
|
||||||
|
t.Run(fmt.Sprint(item.Input), func(t *testing.T) {
|
||||||
|
actual, err := GetLimitFromParameter(item.Input, item.Min, item.Default)
|
||||||
|
|
||||||
|
if err != nil && item.Err != nil && err.Error() != item.Err.Error() {
|
||||||
|
t.Fatalf("GetLimitFromParameter error, expected %#v got %#v", item.Err, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if actual != item.Expected {
|
||||||
|
t.Fatalf("GetLimitFromParameter result error, expected %d got %d", item.Expected, actual)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -9,8 +9,6 @@ import (
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"reflect"
|
|
||||||
"strconv"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
||||||
|
@ -85,33 +83,9 @@ func fromParametersImpl(parameters map[string]interface{}) (*DriverParameters, e
|
||||||
rootDirectory = fmt.Sprint(rootDir)
|
rootDirectory = fmt.Sprint(rootDir)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get maximum number of threads for blocking filesystem operations,
|
maxThreads, err = base.GetLimitFromParameter(parameters["maxthreads"], minThreads, defaultMaxThreads)
|
||||||
// if specified
|
if err != nil {
|
||||||
threads := parameters["maxthreads"]
|
return nil, fmt.Errorf("maxthreads config error: %s", err.Error())
|
||||||
switch v := threads.(type) {
|
|
||||||
case string:
|
|
||||||
if maxThreads, err = strconv.ParseUint(v, 0, 64); err != nil {
|
|
||||||
return nil, fmt.Errorf("maxthreads parameter must be an integer, %v invalid", threads)
|
|
||||||
}
|
|
||||||
case uint64:
|
|
||||||
maxThreads = v
|
|
||||||
case int, int32, int64:
|
|
||||||
val := reflect.ValueOf(v).Convert(reflect.TypeOf(threads)).Int()
|
|
||||||
// If threads is negative casting to uint64 will wrap around and
|
|
||||||
// give you the hugest thread limit ever. Let's be sensible, here
|
|
||||||
if val > 0 {
|
|
||||||
maxThreads = uint64(val)
|
|
||||||
}
|
|
||||||
case uint, uint32:
|
|
||||||
maxThreads = reflect.ValueOf(v).Convert(reflect.TypeOf(threads)).Uint()
|
|
||||||
case nil:
|
|
||||||
// do nothing
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("invalid value for maxthreads: %#v", threads)
|
|
||||||
}
|
|
||||||
|
|
||||||
if maxThreads < minThreads {
|
|
||||||
maxThreads = minThreads
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,7 @@ package gcs
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
@ -49,6 +50,8 @@ const (
|
||||||
uploadSessionContentType = "application/x-docker-upload-session"
|
uploadSessionContentType = "application/x-docker-upload-session"
|
||||||
minChunkSize = 256 * 1024
|
minChunkSize = 256 * 1024
|
||||||
defaultChunkSize = 20 * minChunkSize
|
defaultChunkSize = 20 * minChunkSize
|
||||||
|
defaultMaxConcurrency = 50
|
||||||
|
minConcurrency = 25
|
||||||
|
|
||||||
maxTries = 5
|
maxTries = 5
|
||||||
)
|
)
|
||||||
|
@ -64,6 +67,12 @@ type driverParameters struct {
|
||||||
client *http.Client
|
client *http.Client
|
||||||
rootDirectory string
|
rootDirectory string
|
||||||
chunkSize int
|
chunkSize int
|
||||||
|
|
||||||
|
// maxConcurrency limits the number of concurrent driver operations
|
||||||
|
// to GCS, which ultimately increases reliability of many simultaneous
|
||||||
|
// pushes by ensuring we aren't DoSing our own server with many
|
||||||
|
// connections.
|
||||||
|
maxConcurrency uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
@ -89,6 +98,16 @@ type driver struct {
|
||||||
chunkSize int
|
chunkSize int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wrapper wraps `driver` with a throttler, ensuring that no more than N
|
||||||
|
// GCS actions can occur concurrently. The default limit is 75.
|
||||||
|
type Wrapper struct {
|
||||||
|
baseEmbed
|
||||||
|
}
|
||||||
|
|
||||||
|
type baseEmbed struct {
|
||||||
|
base.Base
|
||||||
|
}
|
||||||
|
|
||||||
// FromParameters constructs a new Driver with a given parameters map
|
// FromParameters constructs a new Driver with a given parameters map
|
||||||
// Required parameters:
|
// Required parameters:
|
||||||
// - bucket
|
// - bucket
|
||||||
|
@ -140,6 +159,31 @@ func FromParameters(parameters map[string]interface{}) (storagedriver.StorageDri
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
ts = jwtConf.TokenSource(context.Background())
|
ts = jwtConf.TokenSource(context.Background())
|
||||||
|
} else if credentials, ok := parameters["credentials"]; ok {
|
||||||
|
credentialMap, ok := credentials.(map[interface{}]interface{})
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("The credentials were not specified in the correct format")
|
||||||
|
}
|
||||||
|
|
||||||
|
stringMap := map[string]interface{}{}
|
||||||
|
for k, v := range credentialMap {
|
||||||
|
key, ok := k.(string)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("One of the credential keys was not a string: %s", fmt.Sprint(k))
|
||||||
|
}
|
||||||
|
stringMap[key] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
data, err := json.Marshal(stringMap)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Failed to marshal gcs credentials to json")
|
||||||
|
}
|
||||||
|
|
||||||
|
jwtConf, err = google.JWTConfigFromJSON(data, storage.ScopeFullControl)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
ts = jwtConf.TokenSource(context.Background())
|
||||||
} else {
|
} else {
|
||||||
var err error
|
var err error
|
||||||
ts, err = google.DefaultTokenSource(context.Background(), storage.ScopeFullControl)
|
ts, err = google.DefaultTokenSource(context.Background(), storage.ScopeFullControl)
|
||||||
|
@ -148,13 +192,19 @@ func FromParameters(parameters map[string]interface{}) (storagedriver.StorageDri
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
maxConcurrency, err := base.GetLimitFromParameter(parameters["maxconcurrency"], minConcurrency, defaultMaxConcurrency)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("maxconcurrency config error: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
params := driverParameters{
|
params := driverParameters{
|
||||||
bucket: fmt.Sprint(bucket),
|
bucket: fmt.Sprint(bucket),
|
||||||
rootDirectory: fmt.Sprint(rootDirectory),
|
rootDirectory: fmt.Sprint(rootDirectory),
|
||||||
email: jwtConf.Email,
|
email: jwtConf.Email,
|
||||||
privateKey: jwtConf.PrivateKey,
|
privateKey: jwtConf.PrivateKey,
|
||||||
client: oauth2.NewClient(context.Background(), ts),
|
client: oauth2.NewClient(context.Background(), ts),
|
||||||
chunkSize: chunkSize,
|
chunkSize: chunkSize,
|
||||||
|
maxConcurrency: maxConcurrency,
|
||||||
}
|
}
|
||||||
|
|
||||||
return New(params)
|
return New(params)
|
||||||
|
@ -178,8 +228,12 @@ func New(params driverParameters) (storagedriver.StorageDriver, error) {
|
||||||
chunkSize: params.chunkSize,
|
chunkSize: params.chunkSize,
|
||||||
}
|
}
|
||||||
|
|
||||||
return &base.Base{
|
return &Wrapper{
|
||||||
StorageDriver: d,
|
baseEmbed: baseEmbed{
|
||||||
|
Base: base.Base{
|
||||||
|
StorageDriver: base.NewRegulator(d, params.maxConcurrency),
|
||||||
|
},
|
||||||
|
},
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -864,7 +918,7 @@ func (d *driver) context(context context.Context) context.Context {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *driver) pathToKey(path string) string {
|
func (d *driver) pathToKey(path string) string {
|
||||||
return strings.TrimRight(d.rootDirectory+strings.TrimLeft(path, "/"), "/")
|
return strings.TrimSpace(strings.TrimRight(d.rootDirectory+strings.TrimLeft(path, "/"), "/"))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *driver) pathToDirKey(path string) string {
|
func (d *driver) pathToDirKey(path string) string {
|
||||||
|
|
Loading…
Reference in a new issue