forked from TrueCloudLab/distribution
Support OSS driver
Signed-off-by: Li Yi <denverdino@gmail.com>
This commit is contained in:
parent
7dc8d4a26b
commit
9e4975d8ff
4 changed files with 989 additions and 0 deletions
|
@ -27,6 +27,7 @@ import (
|
|||
_ "github.com/docker/distribution/registry/storage/driver/filesystem"
|
||||
_ "github.com/docker/distribution/registry/storage/driver/inmemory"
|
||||
_ "github.com/docker/distribution/registry/storage/driver/middleware/cloudfront"
|
||||
_ "github.com/docker/distribution/registry/storage/driver/oss"
|
||||
_ "github.com/docker/distribution/registry/storage/driver/s3"
|
||||
_ "github.com/docker/distribution/registry/storage/driver/swift"
|
||||
"github.com/docker/distribution/uuid"
|
||||
|
|
23
docs/storage-drivers/oss.md
Executable file
23
docs/storage-drivers/oss.md
Executable file
|
@ -0,0 +1,23 @@
|
|||
<!--GITHUB
|
||||
page_title: OSS storage driver
|
||||
page_description: Explains how to use the OSS storage drivers
|
||||
page_keywords: registry, service, driver, images, storage, OSS
|
||||
IGNORES-->
|
||||
|
||||
# OSS storage driver
|
||||
|
||||
An implementation of the `storagedriver.StorageDriver` interface which uses Aliyun OSS for object storage.
|
||||
|
||||
## Parameters
|
||||
|
||||
`accesskeyid`: Your access key ID.
|
||||
|
||||
`accesskeysecret`: Your access key secret.
|
||||
|
||||
`region`: The name of the aws region in which you would like to store objects (for example `oss-cn-beijing`). For a list of regions, you can look at http://docs.aliyun.com/?spm=5176.383663.9.2.0JzTP8#/oss/product-documentation/domain-region
|
||||
|
||||
`bucket`: The name of your OSS bucket where you wish to store objects (needs to already be created prior to driver initialization).
|
||||
|
||||
`chunksize`: (optional) The default part size for multipart uploads (performed by WriteStream) to OSS. The default is 10 MB. Keep in mind that the minimum part size for OSS is 5MB. You might experience better performance for larger chunk sizes depending on the speed of your connection to OSS.
|
||||
|
||||
`rootdirectory`: (optional) The root directory tree in which all registry files will be stored. Defaults to the empty string (bucket root).
|
813
registry/storage/driver/oss/oss.go
Executable file
813
registry/storage/driver/oss/oss.go
Executable file
|
@ -0,0 +1,813 @@
|
|||
// Package oss provides a storagedriver.StorageDriver implementation to
|
||||
// store blobs in Aliyun OSS cloud storage.
|
||||
//
|
||||
// This package leverages the denverdino/aliyungo client library for interfacing with
|
||||
// oss.
|
||||
//
|
||||
// Because OSS is a key, value store the Stat call does not support last modification
|
||||
// time for directories (directories are an abstraction for key, value stores)
|
||||
//
|
||||
// Keep in mind that OSS guarantees only eventual consistency, so do not assume
|
||||
// that a successful write will mean immediate access to the data written (although
|
||||
// in most regions a new object put has guaranteed read after write). The only true
|
||||
// guarantee is that once you call Stat and receive a certain file size, that much of
|
||||
// the file is already accessible.
|
||||
package oss
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"github.com/docker/distribution/context"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/denverdino/aliyungo/oss"
|
||||
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
||||
"github.com/docker/distribution/registry/storage/driver/base"
|
||||
"github.com/docker/distribution/registry/storage/driver/factory"
|
||||
)
|
||||
|
||||
const driverName = "oss"
|
||||
|
||||
// minChunkSize defines the minimum multipart upload chunk size
|
||||
// OSS API requires multipart upload chunks to be at least 5MB
|
||||
const minChunkSize = 5 << 20
|
||||
|
||||
const defaultChunkSize = 2 * minChunkSize
|
||||
|
||||
// listMax is the largest amount of objects you can request from OSS in a list call
|
||||
const listMax = 1000
|
||||
|
||||
//DriverParameters A struct that encapsulates all of the driver parameters after all values have been set
|
||||
type DriverParameters struct {
|
||||
AccessKeyId string
|
||||
AccessKeySecret string
|
||||
Bucket string
|
||||
Region oss.Region
|
||||
Internal bool
|
||||
Encrypt bool
|
||||
Secure bool
|
||||
ChunkSize int64
|
||||
RootDirectory string
|
||||
}
|
||||
|
||||
func init() {
|
||||
factory.Register(driverName, &ossDriverFactory{})
|
||||
}
|
||||
|
||||
// ossDriverFactory implements the factory.StorageDriverFactory interface
|
||||
type ossDriverFactory struct{}
|
||||
|
||||
func (factory *ossDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
|
||||
return FromParameters(parameters)
|
||||
}
|
||||
|
||||
type driver struct {
|
||||
Client *oss.Client
|
||||
Bucket *oss.Bucket
|
||||
ChunkSize int64
|
||||
Encrypt bool
|
||||
RootDirectory string
|
||||
|
||||
pool sync.Pool // pool []byte buffers used for WriteStream
|
||||
zeros []byte // shared, zero-valued buffer used for WriteStream
|
||||
}
|
||||
|
||||
type baseEmbed struct {
|
||||
base.Base
|
||||
}
|
||||
|
||||
// Driver is a storagedriver.StorageDriver implementation backed by Aliyun OSS
|
||||
// Objects are stored at absolute keys in the provided bucket.
|
||||
type Driver struct {
|
||||
baseEmbed
|
||||
}
|
||||
|
||||
// FromParameters constructs a new Driver with a given parameters map
|
||||
// Required parameters:
|
||||
// - accesskey
|
||||
// - secretkey
|
||||
// - region
|
||||
// - bucket
|
||||
// - encrypt
|
||||
func FromParameters(parameters map[string]interface{}) (*Driver, error) {
|
||||
// Providing no values for these is valid in case the user is authenticating
|
||||
// with an IAM on an ec2 instance (in which case the instance credentials will
|
||||
// be summoned when GetAuth is called)
|
||||
accessKey, ok := parameters["accesskeyid"]
|
||||
if !ok {
|
||||
accessKey = ""
|
||||
}
|
||||
secretKey, ok := parameters["accesskeysecret"]
|
||||
if !ok {
|
||||
secretKey = ""
|
||||
}
|
||||
|
||||
regionName, ok := parameters["region"]
|
||||
if !ok || fmt.Sprint(regionName) == "" {
|
||||
return nil, fmt.Errorf("No region parameter provided")
|
||||
}
|
||||
|
||||
bucket, ok := parameters["bucket"]
|
||||
if !ok || fmt.Sprint(bucket) == "" {
|
||||
return nil, fmt.Errorf("No bucket parameter provided")
|
||||
}
|
||||
|
||||
internalBool := false
|
||||
internal, ok := parameters["internal"]
|
||||
if ok {
|
||||
internalBool, ok = internal.(bool)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("The encrypt parameter should be a boolean")
|
||||
}
|
||||
}
|
||||
|
||||
encryptBool := false
|
||||
encrypt, ok := parameters["encrypt"]
|
||||
if ok {
|
||||
encryptBool, ok = encrypt.(bool)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("The encrypt parameter should be a boolean")
|
||||
}
|
||||
}
|
||||
|
||||
secureBool := true
|
||||
secure, ok := parameters["secure"]
|
||||
if ok {
|
||||
secureBool, ok = secure.(bool)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("The secure parameter should be a boolean")
|
||||
}
|
||||
}
|
||||
|
||||
chunkSize := int64(defaultChunkSize)
|
||||
chunkSizeParam, ok := parameters["chunksize"]
|
||||
if ok {
|
||||
switch v := chunkSizeParam.(type) {
|
||||
case string:
|
||||
vv, err := strconv.ParseInt(v, 0, 64)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("chunksize parameter must be an integer, %v invalid", chunkSizeParam)
|
||||
}
|
||||
chunkSize = vv
|
||||
case int64:
|
||||
chunkSize = v
|
||||
case int, uint, int32, uint32, uint64:
|
||||
chunkSize = reflect.ValueOf(v).Convert(reflect.TypeOf(chunkSize)).Int()
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid valud for chunksize: %#v", chunkSizeParam)
|
||||
}
|
||||
|
||||
if chunkSize < minChunkSize {
|
||||
return nil, fmt.Errorf("The chunksize %#v parameter should be a number that is larger than or equal to %d", chunkSize, minChunkSize)
|
||||
}
|
||||
}
|
||||
|
||||
rootDirectory, ok := parameters["rootdirectory"]
|
||||
if !ok {
|
||||
rootDirectory = ""
|
||||
}
|
||||
|
||||
params := DriverParameters{
|
||||
AccessKeyId: fmt.Sprint(accessKey),
|
||||
AccessKeySecret: fmt.Sprint(secretKey),
|
||||
Bucket: fmt.Sprint(bucket),
|
||||
Region: oss.Region(fmt.Sprint(regionName)),
|
||||
ChunkSize: chunkSize,
|
||||
RootDirectory: fmt.Sprint(rootDirectory),
|
||||
Encrypt: encryptBool,
|
||||
Secure: secureBool,
|
||||
Internal: internalBool,
|
||||
}
|
||||
|
||||
return New(params)
|
||||
}
|
||||
|
||||
// New constructs a new Driver with the given AWS credentials, region, encryption flag, and
|
||||
// bucketName
|
||||
func New(params DriverParameters) (*Driver, error) {
|
||||
|
||||
client := oss.NewOSSClient(params.Region, params.Internal, params.AccessKeyId, params.AccessKeySecret)
|
||||
bucket := client.Bucket(params.Bucket)
|
||||
|
||||
// Validate that the given credentials have at least read permissions in the
|
||||
// given bucket scope.
|
||||
if _, err := bucket.List(strings.TrimRight(params.RootDirectory, "/"), "", "", 1); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// TODO Currently multipart uploads have no timestamps, so this would be unwise
|
||||
// if you initiated a new OSS client while another one is running on the same bucket.
|
||||
// multis, _, err := bucket.ListMulti("", "")
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
|
||||
// for _, multi := range multis {
|
||||
// err := multi.Abort()
|
||||
// //TODO appropriate to do this error checking?
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// }
|
||||
|
||||
d := &driver{
|
||||
Client: client,
|
||||
Bucket: bucket,
|
||||
ChunkSize: params.ChunkSize,
|
||||
Encrypt: params.Encrypt,
|
||||
RootDirectory: params.RootDirectory,
|
||||
zeros: make([]byte, params.ChunkSize),
|
||||
}
|
||||
|
||||
d.pool.New = func() interface{} {
|
||||
return make([]byte, d.ChunkSize)
|
||||
}
|
||||
|
||||
return &Driver{
|
||||
baseEmbed: baseEmbed{
|
||||
Base: base.Base{
|
||||
StorageDriver: d,
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Implement the storagedriver.StorageDriver interface
|
||||
|
||||
func (d *driver) Name() string {
|
||||
return driverName
|
||||
}
|
||||
|
||||
// GetContent retrieves the content stored at "path" as a []byte.
|
||||
func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
|
||||
content, err := d.Bucket.Get(d.ossPath(path))
|
||||
if err != nil {
|
||||
return nil, parseError(path, err)
|
||||
}
|
||||
return content, nil
|
||||
}
|
||||
|
||||
// PutContent stores the []byte content at a location designated by "path".
|
||||
func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
|
||||
return parseError(path, d.Bucket.Put(d.ossPath(path), contents, d.getContentType(), getPermissions(), d.getOptions()))
|
||||
}
|
||||
|
||||
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
|
||||
// given byte offset.
|
||||
func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
|
||||
headers := make(http.Header)
|
||||
headers.Add("Range", "bytes="+strconv.FormatInt(offset, 10)+"-")
|
||||
|
||||
resp, err := d.Bucket.GetResponseWithHeaders(d.ossPath(path), headers)
|
||||
if err != nil {
|
||||
if ossErr, ok := err.(*oss.Error); ok && ossErr.Code == "InvalidRange" {
|
||||
return ioutil.NopCloser(bytes.NewReader(nil)), nil
|
||||
}
|
||||
|
||||
return nil, parseError(path, err)
|
||||
}
|
||||
return resp.Body, nil
|
||||
}
|
||||
|
||||
// WriteStream stores the contents of the provided io.Reader at a
|
||||
// location designated by the given path. The driver will know it has
|
||||
// received the full contents when the reader returns io.EOF. The number
|
||||
// of successfully READ bytes will be returned, even if an error is
|
||||
// returned. May be used to resume writing a stream by providing a nonzero
|
||||
// offset. Offsets past the current size will write from the position
|
||||
// beyond the end of the file.
|
||||
func (d *driver) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (totalRead int64, err error) {
|
||||
partNumber := 1
|
||||
bytesRead := 0
|
||||
var putErrChan chan error
|
||||
parts := []oss.Part{}
|
||||
var part oss.Part
|
||||
done := make(chan struct{}) // stopgap to free up waiting goroutines
|
||||
|
||||
multi, err := d.Bucket.InitMulti(d.ossPath(path), d.getContentType(), getPermissions(), d.getOptions())
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
buf := d.getbuf()
|
||||
|
||||
// We never want to leave a dangling multipart upload, our only consistent state is
|
||||
// when there is a whole object at path. This is in order to remain consistent with
|
||||
// the stat call.
|
||||
//
|
||||
// Note that if the machine dies before executing the defer, we will be left with a dangling
|
||||
// multipart upload, which will eventually be cleaned up, but we will lose all of the progress
|
||||
// made prior to the machine crashing.
|
||||
defer func() {
|
||||
if putErrChan != nil {
|
||||
if putErr := <-putErrChan; putErr != nil {
|
||||
err = putErr
|
||||
}
|
||||
}
|
||||
|
||||
if len(parts) > 0 {
|
||||
if multi == nil {
|
||||
// Parts should be empty if the multi is not initialized
|
||||
panic("Unreachable")
|
||||
} else {
|
||||
if multi.Complete(parts) != nil {
|
||||
multi.Abort()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
d.putbuf(buf) // needs to be here to pick up new buf value
|
||||
close(done) // free up any waiting goroutines
|
||||
}()
|
||||
|
||||
// Fills from 0 to total from current
|
||||
fromSmallCurrent := func(total int64) error {
|
||||
current, err := d.ReadStream(ctx, path, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
bytesRead = 0
|
||||
for int64(bytesRead) < total {
|
||||
//The loop should very rarely enter a second iteration
|
||||
nn, err := current.Read(buf[bytesRead:total])
|
||||
bytesRead += nn
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
return err
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Fills from parameter to chunkSize from reader
|
||||
fromReader := func(from int64) error {
|
||||
bytesRead = 0
|
||||
for from+int64(bytesRead) < d.ChunkSize {
|
||||
nn, err := reader.Read(buf[from+int64(bytesRead):])
|
||||
totalRead += int64(nn)
|
||||
bytesRead += nn
|
||||
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
return err
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if putErrChan == nil {
|
||||
putErrChan = make(chan error)
|
||||
} else {
|
||||
if putErr := <-putErrChan; putErr != nil {
|
||||
putErrChan = nil
|
||||
return putErr
|
||||
}
|
||||
}
|
||||
|
||||
go func(bytesRead int, from int64, buf []byte) {
|
||||
defer d.putbuf(buf) // this buffer gets dropped after this call
|
||||
|
||||
// DRAGONS(stevvooe): There are few things one might want to know
|
||||
// about this section. First, the putErrChan is expecting an error
|
||||
// and a nil or just a nil to come through the channel. This is
|
||||
// covered by the silly defer below. The other aspect is the OSS
|
||||
// retry backoff to deal with RequestTimeout errors. Even though
|
||||
// the underlying OSS library should handle it, it doesn't seem to
|
||||
// be part of the shouldRetry function (see denverdino/aliyungo/oss).
|
||||
defer func() {
|
||||
select {
|
||||
case putErrChan <- nil: // for some reason, we do this no matter what.
|
||||
case <-done:
|
||||
return // ensure we don't leak the goroutine
|
||||
}
|
||||
}()
|
||||
|
||||
if bytesRead <= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
var err error
|
||||
var part oss.Part
|
||||
|
||||
loop:
|
||||
for retries := 0; retries < 5; retries++ {
|
||||
part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+from]))
|
||||
if err == nil {
|
||||
break // success!
|
||||
}
|
||||
|
||||
// NOTE(stevvooe): This retry code tries to only retry under
|
||||
// conditions where the OSS package does not. We may add oss
|
||||
// error codes to the below if we see others bubble up in the
|
||||
// application. Right now, the most troubling is
|
||||
// RequestTimeout, which seems to only triggered when a tcp
|
||||
// connection to OSS slows to a crawl. If the RequestTimeout
|
||||
// ends up getting added to the OSS library and we don't see
|
||||
// other errors, this retry loop can be removed.
|
||||
switch err := err.(type) {
|
||||
case *oss.Error:
|
||||
switch err.Code {
|
||||
case "RequestTimeout":
|
||||
// allow retries on only this error.
|
||||
default:
|
||||
break loop
|
||||
}
|
||||
}
|
||||
|
||||
backoff := 100 * time.Millisecond * time.Duration(retries+1)
|
||||
logrus.Errorf("error putting part, retrying after %v: %v", err, backoff.String())
|
||||
time.Sleep(backoff)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
logrus.Errorf("error putting part, aborting: %v", err)
|
||||
select {
|
||||
case putErrChan <- err:
|
||||
case <-done:
|
||||
return // don't leak the goroutine
|
||||
}
|
||||
}
|
||||
|
||||
// parts and partNumber are safe, because this function is the
|
||||
// only one modifying them and we force it to be executed
|
||||
// serially.
|
||||
parts = append(parts, part)
|
||||
partNumber++
|
||||
}(bytesRead, from, buf)
|
||||
|
||||
buf = d.getbuf() // use a new buffer for the next call
|
||||
return nil
|
||||
}
|
||||
|
||||
if offset > 0 {
|
||||
resp, err := d.Bucket.Head(d.ossPath(path), nil)
|
||||
if err != nil {
|
||||
if ossErr, ok := err.(*oss.Error); !ok || ossErr.Code != "NoSuchKey" {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
currentLength := int64(0)
|
||||
if err == nil {
|
||||
currentLength = resp.ContentLength
|
||||
}
|
||||
|
||||
if currentLength >= offset {
|
||||
if offset < d.ChunkSize {
|
||||
// chunkSize > currentLength >= offset
|
||||
if err = fromSmallCurrent(offset); err != nil {
|
||||
return totalRead, err
|
||||
}
|
||||
|
||||
if err = fromReader(offset); err != nil {
|
||||
return totalRead, err
|
||||
}
|
||||
|
||||
if totalRead+offset < d.ChunkSize {
|
||||
return totalRead, nil
|
||||
}
|
||||
} else {
|
||||
// currentLength >= offset >= chunkSize
|
||||
_, part, err = multi.PutPartCopy(partNumber,
|
||||
oss.CopyOptions{CopySourceOptions: "bytes=0-" + strconv.FormatInt(offset-1, 10)},
|
||||
d.Bucket.Name+"/"+d.ossPath(path))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
parts = append(parts, part)
|
||||
partNumber++
|
||||
}
|
||||
} else {
|
||||
// Fills between parameters with 0s but only when to - from <= chunkSize
|
||||
fromZeroFillSmall := func(from, to int64) error {
|
||||
bytesRead = 0
|
||||
for from+int64(bytesRead) < to {
|
||||
nn, err := bytes.NewReader(d.zeros).Read(buf[from+int64(bytesRead) : to])
|
||||
bytesRead += nn
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Fills between parameters with 0s, making new parts
|
||||
fromZeroFillLarge := func(from, to int64) error {
|
||||
bytesRead64 := int64(0)
|
||||
for to-(from+bytesRead64) >= d.ChunkSize {
|
||||
part, err := multi.PutPart(int(partNumber), bytes.NewReader(d.zeros))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
bytesRead64 += d.ChunkSize
|
||||
|
||||
parts = append(parts, part)
|
||||
partNumber++
|
||||
}
|
||||
|
||||
return fromZeroFillSmall(0, (to-from)%d.ChunkSize)
|
||||
}
|
||||
|
||||
// currentLength < offset
|
||||
if currentLength < d.ChunkSize {
|
||||
if offset < d.ChunkSize {
|
||||
// chunkSize > offset > currentLength
|
||||
if err = fromSmallCurrent(currentLength); err != nil {
|
||||
return totalRead, err
|
||||
}
|
||||
|
||||
if err = fromZeroFillSmall(currentLength, offset); err != nil {
|
||||
return totalRead, err
|
||||
}
|
||||
|
||||
if err = fromReader(offset); err != nil {
|
||||
return totalRead, err
|
||||
}
|
||||
|
||||
if totalRead+offset < d.ChunkSize {
|
||||
return totalRead, nil
|
||||
}
|
||||
} else {
|
||||
// offset >= chunkSize > currentLength
|
||||
if err = fromSmallCurrent(currentLength); err != nil {
|
||||
return totalRead, err
|
||||
}
|
||||
|
||||
if err = fromZeroFillSmall(currentLength, d.ChunkSize); err != nil {
|
||||
return totalRead, err
|
||||
}
|
||||
|
||||
part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf))
|
||||
if err != nil {
|
||||
return totalRead, err
|
||||
}
|
||||
|
||||
parts = append(parts, part)
|
||||
partNumber++
|
||||
|
||||
//Zero fill from chunkSize up to offset, then some reader
|
||||
if err = fromZeroFillLarge(d.ChunkSize, offset); err != nil {
|
||||
return totalRead, err
|
||||
}
|
||||
|
||||
if err = fromReader(offset % d.ChunkSize); err != nil {
|
||||
return totalRead, err
|
||||
}
|
||||
|
||||
if totalRead+(offset%d.ChunkSize) < d.ChunkSize {
|
||||
return totalRead, nil
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// offset > currentLength >= chunkSize
|
||||
_, part, err = multi.PutPartCopy(partNumber,
|
||||
oss.CopyOptions{},
|
||||
d.Bucket.Name+"/"+d.ossPath(path))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
parts = append(parts, part)
|
||||
partNumber++
|
||||
|
||||
//Zero fill from currentLength up to offset, then some reader
|
||||
if err = fromZeroFillLarge(currentLength, offset); err != nil {
|
||||
return totalRead, err
|
||||
}
|
||||
|
||||
if err = fromReader((offset - currentLength) % d.ChunkSize); err != nil {
|
||||
return totalRead, err
|
||||
}
|
||||
|
||||
if totalRead+((offset-currentLength)%d.ChunkSize) < d.ChunkSize {
|
||||
return totalRead, nil
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
if err = fromReader(0); err != nil {
|
||||
return totalRead, err
|
||||
}
|
||||
|
||||
if int64(bytesRead) < d.ChunkSize {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return totalRead, nil
|
||||
}
|
||||
|
||||
// Stat retrieves the FileInfo for the given path, including the current size
|
||||
// in bytes and the creation time.
|
||||
func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
|
||||
listResponse, err := d.Bucket.List(d.ossPath(path), "", "", 1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fi := storagedriver.FileInfoFields{
|
||||
Path: path,
|
||||
}
|
||||
|
||||
if len(listResponse.Contents) == 1 {
|
||||
if listResponse.Contents[0].Key != d.ossPath(path) {
|
||||
fi.IsDir = true
|
||||
} else {
|
||||
fi.IsDir = false
|
||||
fi.Size = listResponse.Contents[0].Size
|
||||
|
||||
timestamp, err := time.Parse(time.RFC3339Nano, listResponse.Contents[0].LastModified)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fi.ModTime = timestamp
|
||||
}
|
||||
} else if len(listResponse.CommonPrefixes) == 1 {
|
||||
fi.IsDir = true
|
||||
} else {
|
||||
return nil, storagedriver.PathNotFoundError{Path: path}
|
||||
}
|
||||
|
||||
return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
|
||||
}
|
||||
|
||||
// List returns a list of the objects that are direct descendants of the given path.
|
||||
func (d *driver) List(ctx context.Context, path string) ([]string, error) {
|
||||
if path != "/" && path[len(path)-1] != '/' {
|
||||
path = path + "/"
|
||||
}
|
||||
|
||||
// This is to cover for the cases when the rootDirectory of the driver is either "" or "/".
|
||||
// In those cases, there is no root prefix to replace and we must actually add a "/" to all
|
||||
// results in order to keep them as valid paths as recognized by storagedriver.PathRegexp
|
||||
prefix := ""
|
||||
if d.ossPath("") == "" {
|
||||
prefix = "/"
|
||||
}
|
||||
|
||||
listResponse, err := d.Bucket.List(d.ossPath(path), "/", "", listMax)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
files := []string{}
|
||||
directories := []string{}
|
||||
|
||||
for {
|
||||
for _, key := range listResponse.Contents {
|
||||
files = append(files, strings.Replace(key.Key, d.ossPath(""), prefix, 1))
|
||||
}
|
||||
|
||||
for _, commonPrefix := range listResponse.CommonPrefixes {
|
||||
directories = append(directories, strings.Replace(commonPrefix[0:len(commonPrefix)-1], d.ossPath(""), prefix, 1))
|
||||
}
|
||||
|
||||
if listResponse.IsTruncated {
|
||||
listResponse, err = d.Bucket.List(d.ossPath(path), "/", listResponse.NextMarker, listMax)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return append(files, directories...), nil
|
||||
}
|
||||
|
||||
// Move moves an object stored at sourcePath to destPath, removing the original
|
||||
// object.
|
||||
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
|
||||
logrus.Infof("Move from %s to %s", d.Bucket.Path("/"+d.ossPath(sourcePath)), d.ossPath(destPath))
|
||||
/* This is terrible, but aws doesn't have an actual move. */
|
||||
_, err := d.Bucket.PutCopy(d.ossPath(destPath), getPermissions(),
|
||||
oss.CopyOptions{
|
||||
//Options: d.getOptions(),
|
||||
//ContentType: d.getContentType()
|
||||
},
|
||||
d.Bucket.Path(d.ossPath(sourcePath)))
|
||||
if err != nil {
|
||||
return parseError(sourcePath, err)
|
||||
}
|
||||
|
||||
return d.Delete(ctx, sourcePath)
|
||||
}
|
||||
|
||||
// Delete recursively deletes all objects stored at "path" and its subpaths.
|
||||
func (d *driver) Delete(ctx context.Context, path string) error {
|
||||
listResponse, err := d.Bucket.List(d.ossPath(path), "", "", listMax)
|
||||
if err != nil || len(listResponse.Contents) == 0 {
|
||||
return storagedriver.PathNotFoundError{Path: path}
|
||||
}
|
||||
|
||||
ossObjects := make([]oss.Object, listMax)
|
||||
|
||||
for len(listResponse.Contents) > 0 {
|
||||
for index, key := range listResponse.Contents {
|
||||
ossObjects[index].Key = key.Key
|
||||
}
|
||||
|
||||
err := d.Bucket.DelMulti(oss.Delete{Quiet: false, Objects: ossObjects[0:len(listResponse.Contents)]})
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
listResponse, err = d.Bucket.List(d.ossPath(path), "", "", listMax)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// URLFor returns a URL which may be used to retrieve the content stored at the given path.
|
||||
// May return an UnsupportedMethodErr in certain StorageDriver implementations.
|
||||
func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
|
||||
methodString := "GET"
|
||||
method, ok := options["method"]
|
||||
if ok {
|
||||
methodString, ok = method.(string)
|
||||
if !ok || (methodString != "GET" && methodString != "HEAD") {
|
||||
return "", storagedriver.ErrUnsupportedMethod
|
||||
}
|
||||
}
|
||||
|
||||
expiresTime := time.Now().Add(20 * time.Minute)
|
||||
logrus.Infof("expiresTime: %d", expiresTime)
|
||||
|
||||
expires, ok := options["expiry"]
|
||||
if ok {
|
||||
et, ok := expires.(time.Time)
|
||||
if ok {
|
||||
expiresTime = et
|
||||
}
|
||||
}
|
||||
logrus.Infof("expiresTime: %d", expiresTime)
|
||||
testURL := d.Bucket.SignedURLWithMethod(methodString, d.ossPath(path), expiresTime, nil, nil)
|
||||
logrus.Infof("testURL: %s", testURL)
|
||||
return testURL, nil
|
||||
}
|
||||
|
||||
func (d *driver) ossPath(path string) string {
|
||||
return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/")
|
||||
}
|
||||
|
||||
// S3BucketKey returns the OSS bucket key for the given storage driver path.
|
||||
func (d *Driver) S3BucketKey(path string) string {
|
||||
return d.StorageDriver.(*driver).ossPath(path)
|
||||
}
|
||||
|
||||
func parseError(path string, err error) error {
|
||||
if ossErr, ok := err.(*oss.Error); ok && ossErr.Code == "NoSuchKey" {
|
||||
return storagedriver.PathNotFoundError{Path: path}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func hasCode(err error, code string) bool {
|
||||
ossErr, ok := err.(*oss.Error)
|
||||
return ok && ossErr.Code == code
|
||||
}
|
||||
|
||||
func (d *driver) getOptions() oss.Options {
|
||||
return oss.Options{ServerSideEncryption: d.Encrypt}
|
||||
}
|
||||
|
||||
func getPermissions() oss.ACL {
|
||||
return oss.Private
|
||||
}
|
||||
|
||||
func (d *driver) getContentType() string {
|
||||
return "application/octet-stream"
|
||||
}
|
||||
|
||||
// getbuf returns a buffer from the driver's pool with length d.ChunkSize.
|
||||
func (d *driver) getbuf() []byte {
|
||||
return d.pool.Get().([]byte)
|
||||
}
|
||||
|
||||
func (d *driver) putbuf(p []byte) {
|
||||
copy(p, d.zeros)
|
||||
d.pool.Put(p)
|
||||
}
|
152
registry/storage/driver/oss/oss_test.go
Executable file
152
registry/storage/driver/oss/oss_test.go
Executable file
|
@ -0,0 +1,152 @@
|
|||
package oss
|
||||
|
||||
import (
|
||||
alioss "github.com/denverdino/aliyungo/oss"
|
||||
"github.com/docker/distribution/context"
|
||||
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
||||
"github.com/docker/distribution/registry/storage/driver/testsuites"
|
||||
"io/ioutil"
|
||||
//"log"
|
||||
"os"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"gopkg.in/check.v1"
|
||||
)
|
||||
|
||||
// Hook up gocheck into the "go test" runner.
|
||||
func Test(t *testing.T) { check.TestingT(t) }
|
||||
|
||||
type OSSDriverConstructor func(rootDirectory string) (*Driver, error)
|
||||
|
||||
func init() {
|
||||
accessKey := os.Getenv("ALIYUN_ACCESS_KEY_ID")
|
||||
secretKey := os.Getenv("ALIYUN_ACCESS_KEY_SECRET")
|
||||
bucket := os.Getenv("OSS_BUCKET")
|
||||
region := os.Getenv("OSS_REGION")
|
||||
internal := os.Getenv("OSS_INTERNAL")
|
||||
encrypt := os.Getenv("OSS_ENCRYPT")
|
||||
secure := os.Getenv("OSS_SECURE")
|
||||
root, err := ioutil.TempDir("", "driver-")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer os.Remove(root)
|
||||
|
||||
ossDriverConstructor := func(rootDirectory string) (*Driver, error) {
|
||||
encryptBool := false
|
||||
if encrypt != "" {
|
||||
encryptBool, err = strconv.ParseBool(encrypt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
secureBool := false
|
||||
if secure != "" {
|
||||
secureBool, err = strconv.ParseBool(secure)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
internalBool := false
|
||||
if internal != "" {
|
||||
internalBool, err = strconv.ParseBool(internal)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
parameters := DriverParameters{
|
||||
accessKey,
|
||||
secretKey,
|
||||
bucket,
|
||||
alioss.Region(region),
|
||||
internalBool,
|
||||
encryptBool,
|
||||
secureBool,
|
||||
minChunkSize,
|
||||
rootDirectory,
|
||||
}
|
||||
|
||||
return New(parameters)
|
||||
}
|
||||
|
||||
// Skip OSS storage driver tests if environment variable parameters are not provided
|
||||
skipCheck := func() string {
|
||||
if accessKey == "" || secretKey == "" || region == "" || bucket == "" || encrypt == "" {
|
||||
return "Must set ALIYUN_ACCESS_KEY_ID, ALIYUN_ACCESS_KEY_SECRET, OSS_REGION, OSS_BUCKET, and OSS_ENCRYPT to run OSS tests"
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
driverConstructor := func() (storagedriver.StorageDriver, error) {
|
||||
return ossDriverConstructor(root)
|
||||
}
|
||||
|
||||
testsuites.RegisterInProcessSuite(driverConstructor, skipCheck)
|
||||
|
||||
// ossConstructor := func() (*Driver, error) {
|
||||
// return ossDriverConstructor(aws.GetRegion(region))
|
||||
// }
|
||||
|
||||
RegisterOSSDriverSuite(ossDriverConstructor, skipCheck)
|
||||
|
||||
// testsuites.RegisterIPCSuite(driverName, map[string]string{
|
||||
// "accesskey": accessKey,
|
||||
// "secretkey": secretKey,
|
||||
// "region": region.Name,
|
||||
// "bucket": bucket,
|
||||
// "encrypt": encrypt,
|
||||
// }, skipCheck)
|
||||
// }
|
||||
}
|
||||
|
||||
func RegisterOSSDriverSuite(ossDriverConstructor OSSDriverConstructor, skipCheck testsuites.SkipCheck) {
|
||||
check.Suite(&OSSDriverSuite{
|
||||
Constructor: ossDriverConstructor,
|
||||
SkipCheck: skipCheck,
|
||||
})
|
||||
}
|
||||
|
||||
type OSSDriverSuite struct {
|
||||
Constructor OSSDriverConstructor
|
||||
testsuites.SkipCheck
|
||||
}
|
||||
|
||||
func (suite *OSSDriverSuite) SetUpSuite(c *check.C) {
|
||||
if reason := suite.SkipCheck(); reason != "" {
|
||||
c.Skip(reason)
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *OSSDriverSuite) TestEmptyRootList(c *check.C) {
|
||||
validRoot, err := ioutil.TempDir("", "driver-")
|
||||
c.Assert(err, check.IsNil)
|
||||
defer os.Remove(validRoot)
|
||||
|
||||
rootedDriver, err := suite.Constructor(validRoot)
|
||||
c.Assert(err, check.IsNil)
|
||||
emptyRootDriver, err := suite.Constructor("")
|
||||
c.Assert(err, check.IsNil)
|
||||
slashRootDriver, err := suite.Constructor("/")
|
||||
c.Assert(err, check.IsNil)
|
||||
|
||||
filename := "/test"
|
||||
contents := []byte("contents")
|
||||
ctx := context.Background()
|
||||
err = rootedDriver.PutContent(ctx, filename, contents)
|
||||
c.Assert(err, check.IsNil)
|
||||
defer rootedDriver.Delete(ctx, filename)
|
||||
|
||||
keys, err := emptyRootDriver.List(ctx, "/")
|
||||
for _, path := range keys {
|
||||
c.Assert(storagedriver.PathRegexp.MatchString(path), check.Equals, true)
|
||||
}
|
||||
|
||||
keys, err = slashRootDriver.List(ctx, "/")
|
||||
for _, path := range keys {
|
||||
c.Assert(storagedriver.PathRegexp.MatchString(path), check.Equals, true)
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue