forked from TrueCloudLab/distribution
Fix azure storagedriver methods, implement Stat, URLFor
Signed-off-by: Ahmet Alp Balkan <ahmetalpbalkan@gmail.com>
This commit is contained in:
parent
bdc268bca3
commit
bc42f53ec8
3 changed files with 136 additions and 40 deletions
|
@ -14,7 +14,7 @@ import (
|
||||||
// An out-of-process Azure Storage driver, intended to be run by ipc.NewDriverClient
|
// An out-of-process Azure Storage driver, intended to be run by ipc.NewDriverClient
|
||||||
func main() {
|
func main() {
|
||||||
parametersBytes := []byte(os.Args[1])
|
parametersBytes := []byte(os.Args[1])
|
||||||
var parameters map[string]string
|
var parameters map[string]interface{}
|
||||||
err := json.Unmarshal(parametersBytes, ¶meters)
|
err := json.Unmarshal(parametersBytes, ¶meters)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
|
|
@ -1,5 +1,3 @@
|
||||||
// +build ignore
|
|
||||||
|
|
||||||
// Package azure provides a storagedriver.StorageDriver implementation to
|
// Package azure provides a storagedriver.StorageDriver implementation to
|
||||||
// store blobs in Microsoft Azure Blob Storage Service.
|
// store blobs in Microsoft Azure Blob Storage Service.
|
||||||
package azure
|
package azure
|
||||||
|
@ -10,8 +8,10 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/docker/distribution/storagedriver"
|
"github.com/docker/distribution/storagedriver"
|
||||||
"github.com/docker/distribution/storagedriver/factory"
|
"github.com/docker/distribution/storagedriver/factory"
|
||||||
|
@ -40,28 +40,28 @@ func init() {
|
||||||
|
|
||||||
type azureDriverFactory struct{}
|
type azureDriverFactory struct{}
|
||||||
|
|
||||||
func (factory *azureDriverFactory) Create(parameters map[string]string) (storagedriver.StorageDriver, error) {
|
func (factory *azureDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
|
||||||
return FromParameters(parameters)
|
return FromParameters(parameters)
|
||||||
}
|
}
|
||||||
|
|
||||||
// FromParameters constructs a new Driver with a given parameters map.
|
// FromParameters constructs a new Driver with a given parameters map.
|
||||||
func FromParameters(parameters map[string]string) (*Driver, error) {
|
func FromParameters(parameters map[string]interface{}) (*Driver, error) {
|
||||||
accountName, ok := parameters[paramAccountName]
|
accountName, ok := parameters[paramAccountName]
|
||||||
if !ok {
|
if !ok || fmt.Sprint(accountName) == "" {
|
||||||
return nil, fmt.Errorf("No %s parameter provided", paramAccountName)
|
return nil, fmt.Errorf("No %s parameter provided", paramAccountName)
|
||||||
}
|
}
|
||||||
|
|
||||||
accountKey, ok := parameters[paramAccountKey]
|
accountKey, ok := parameters[paramAccountKey]
|
||||||
if !ok {
|
if !ok || fmt.Sprint(accountKey) == "" {
|
||||||
return nil, fmt.Errorf("No %s parameter provided", paramAccountKey)
|
return nil, fmt.Errorf("No %s parameter provided", paramAccountKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
container, ok := parameters[paramContainer]
|
container, ok := parameters[paramContainer]
|
||||||
if !ok {
|
if !ok || fmt.Sprint(container) == "" {
|
||||||
return nil, fmt.Errorf("No %s parameter provided", paramContainer)
|
return nil, fmt.Errorf("No %s parameter provided", paramContainer)
|
||||||
}
|
}
|
||||||
|
|
||||||
return New(accountName, accountKey, container)
|
return New(fmt.Sprint(accountName), fmt.Sprint(accountKey), fmt.Sprint(container))
|
||||||
}
|
}
|
||||||
|
|
||||||
// New constructs a new Driver with the given Azure Storage Account credentials
|
// New constructs a new Driver with the given Azure Storage Account credentials
|
||||||
|
@ -87,6 +87,10 @@ func New(accountName, accountKey, container string) (*Driver, error) {
|
||||||
|
|
||||||
// GetContent retrieves the content stored at "path" as a []byte.
|
// GetContent retrieves the content stored at "path" as a []byte.
|
||||||
func (d *Driver) GetContent(path string) ([]byte, error) {
|
func (d *Driver) GetContent(path string) ([]byte, error) {
|
||||||
|
if !storagedriver.PathRegexp.MatchString(path) {
|
||||||
|
return nil, storagedriver.InvalidPathError{Path: path}
|
||||||
|
}
|
||||||
|
|
||||||
blob, err := d.client.GetBlob(d.container, path)
|
blob, err := d.client.GetBlob(d.container, path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if is404(err) {
|
if is404(err) {
|
||||||
|
@ -100,25 +104,29 @@ func (d *Driver) GetContent(path string) ([]byte, error) {
|
||||||
|
|
||||||
// PutContent stores the []byte content at a location designated by "path".
|
// PutContent stores the []byte content at a location designated by "path".
|
||||||
func (d *Driver) PutContent(path string, contents []byte) error {
|
func (d *Driver) PutContent(path string, contents []byte) error {
|
||||||
|
if !storagedriver.PathRegexp.MatchString(path) {
|
||||||
|
return storagedriver.InvalidPathError{Path: path}
|
||||||
|
}
|
||||||
return d.client.PutBlockBlob(d.container, path, ioutil.NopCloser(bytes.NewReader(contents)))
|
return d.client.PutBlockBlob(d.container, path, ioutil.NopCloser(bytes.NewReader(contents)))
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
|
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
|
||||||
// given byte offset.
|
// given byte offset.
|
||||||
func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) {
|
func (d *driver) ReadStream(path string, offset int64) (io.ReadCloser, error) {
|
||||||
if ok, err := d.client.BlobExists(d.container, path); err != nil {
|
if ok, err := d.client.BlobExists(d.container, path); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
} else if !ok {
|
} else if !ok {
|
||||||
return nil, storagedriver.PathNotFoundError{Path: path}
|
return nil, storagedriver.PathNotFoundError{Path: path}
|
||||||
}
|
}
|
||||||
|
|
||||||
size, err := d.CurrentSize(path)
|
info, err := d.client.GetBlobProperties(d.container, path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if offset >= int64(size) {
|
size := int64(info.ContentLength)
|
||||||
return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
|
if offset >= size {
|
||||||
|
return ioutil.NopCloser(bytes.NewReader(nil)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
bytesRange := fmt.Sprintf("%v-", offset)
|
bytesRange := fmt.Sprintf("%v-", offset)
|
||||||
|
@ -131,7 +139,11 @@ func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) {
|
||||||
|
|
||||||
// WriteStream stores the contents of the provided io.ReadCloser at a location
|
// WriteStream stores the contents of the provided io.ReadCloser at a location
|
||||||
// designated by the given path.
|
// designated by the given path.
|
||||||
func (d *Driver) WriteStream(path string, offset, size int64, reader io.ReadCloser) error {
|
func (d *Driver) WriteStream(path string, offset int64, reader io.Reader) (int64, error) {
|
||||||
|
if !storagedriver.PathRegexp.MatchString(path) {
|
||||||
|
return 0, storagedriver.InvalidPathError{Path: path}
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
lastBlockNum int
|
lastBlockNum int
|
||||||
resumableOffset int64
|
resumableOffset int64
|
||||||
|
@ -139,20 +151,20 @@ func (d *Driver) WriteStream(path string, offset, size int64, reader io.ReadClos
|
||||||
)
|
)
|
||||||
|
|
||||||
if blobExists, err := d.client.BlobExists(d.container, path); err != nil {
|
if blobExists, err := d.client.BlobExists(d.container, path); err != nil {
|
||||||
return err
|
return 0, err
|
||||||
} else if !blobExists { // new blob
|
} else if !blobExists { // new blob
|
||||||
lastBlockNum = 0
|
lastBlockNum = 0
|
||||||
resumableOffset = 0
|
resumableOffset = 0
|
||||||
} else { // append
|
} else { // append
|
||||||
if parts, err := d.client.GetBlockList(d.container, path, azure.BlockListTypeCommitted); err != nil {
|
if parts, err := d.client.GetBlockList(d.container, path, azure.BlockListTypeCommitted); err != nil {
|
||||||
return err
|
return 0, err
|
||||||
} else if len(parts.CommittedBlocks) == 0 {
|
} else if len(parts.CommittedBlocks) == 0 {
|
||||||
lastBlockNum = 0
|
lastBlockNum = 0
|
||||||
resumableOffset = 0
|
resumableOffset = 0
|
||||||
} else {
|
} else {
|
||||||
lastBlock := parts.CommittedBlocks[len(parts.CommittedBlocks)-1]
|
lastBlock := parts.CommittedBlocks[len(parts.CommittedBlocks)-1]
|
||||||
if lastBlockNum, err = blockNum(lastBlock.Name); err != nil {
|
if lastBlockNum, err = fromBlockID(lastBlock.Name); err != nil {
|
||||||
return fmt.Errorf("Cannot parse block name as number '%s': %s", lastBlock.Name, err.Error())
|
return 0, fmt.Errorf("Cannot parse block name as number '%s': %s", lastBlock.Name, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
var totalSize int64
|
var totalSize int64
|
||||||
|
@ -174,11 +186,17 @@ func (d *Driver) WriteStream(path string, offset, size int64, reader io.ReadClos
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if offset != resumableOffset {
|
if offset < resumableOffset {
|
||||||
return storagedriver.InvalidOffsetError{Path: path, Offset: offset}
|
// only writing at the end or after the end of the file is supported
|
||||||
|
return 0, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
|
||||||
|
} else if offset > resumableOffset {
|
||||||
|
// zero-fill in between, construct a multi-reader
|
||||||
|
zeroReader := bytes.NewReader(make([]byte, offset-resumableOffset))
|
||||||
|
reader = io.MultiReader(zeroReader, reader)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put content
|
// Put content
|
||||||
|
var nn int64
|
||||||
buf := make([]byte, azure.MaxBlobBlockSize)
|
buf := make([]byte, azure.MaxBlobBlockSize)
|
||||||
for {
|
for {
|
||||||
// Read chunks of exactly size N except the last chunk to
|
// Read chunks of exactly size N except the last chunk to
|
||||||
|
@ -187,35 +205,89 @@ func (d *Driver) WriteStream(path string, offset, size int64, reader io.ReadClos
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
nn += int64(n)
|
||||||
|
|
||||||
data := buf[:n]
|
data := buf[:n]
|
||||||
blockID := toBlockID(lastBlockNum + 1)
|
lastBlockNum++
|
||||||
|
blockID := toBlockID(lastBlockNum)
|
||||||
if err = d.client.PutBlock(d.container, path, blockID, data); err != nil {
|
if err = d.client.PutBlock(d.container, path, blockID, data); err != nil {
|
||||||
return err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
blocks = append(blocks, azure.Block{
|
blocks = append(blocks, azure.Block{
|
||||||
Id: blockID,
|
Id: blockID,
|
||||||
Status: azure.BlockStatusLatest})
|
Status: azure.BlockStatusLatest})
|
||||||
lastBlockNum++
|
}
|
||||||
|
|
||||||
|
// If there was a zero-fill, adjust nn to exclude zeros
|
||||||
|
if offset > resumableOffset {
|
||||||
|
nn -= offset - resumableOffset
|
||||||
}
|
}
|
||||||
|
|
||||||
// Commit block list
|
// Commit block list
|
||||||
return d.client.PutBlockList(d.container, path, blocks)
|
return nn, d.client.PutBlockList(d.container, path, blocks)
|
||||||
}
|
}
|
||||||
|
|
||||||
// CurrentSize retrieves the curernt size in bytes of the object at the given
|
// Stat retrieves the FileInfo for the given path, including the current size
|
||||||
// path.
|
// in bytes and the creation time.
|
||||||
func (d *Driver) CurrentSize(path string) (uint64, error) {
|
func (d *Driver) Stat(path string) (storagedriver.FileInfo, error) {
|
||||||
props, err := d.client.GetBlobProperties(d.container, path)
|
if !storagedriver.PathRegexp.MatchString(path) {
|
||||||
if err != nil {
|
return nil, storagedriver.InvalidPathError{Path: path}
|
||||||
return 0, err
|
|
||||||
}
|
}
|
||||||
return props.ContentLength, nil
|
|
||||||
|
// Check if the path is a blob
|
||||||
|
if ok, err := d.client.BlobExists(d.container, path); err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else if ok {
|
||||||
|
blob, err := d.client.GetBlobProperties(d.container, path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
mtim, err := time.Parse(http.TimeFormat, blob.LastModified)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return storagedriver.FileInfoInternal{FileInfoFields: storagedriver.FileInfoFields{
|
||||||
|
Path: path,
|
||||||
|
Size: int64(blob.ContentLength),
|
||||||
|
ModTime: mtim,
|
||||||
|
IsDir: false,
|
||||||
|
}}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if path is a virtual container
|
||||||
|
virtContainerPath := path
|
||||||
|
if !strings.HasSuffix(virtContainerPath, "/") {
|
||||||
|
virtContainerPath += "/"
|
||||||
|
}
|
||||||
|
blobs, err := d.client.ListBlobs(d.container, azure.ListBlobsParameters{
|
||||||
|
Prefix: virtContainerPath,
|
||||||
|
MaxResults: 1,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if len(blobs.Blobs) > 0 {
|
||||||
|
// path is a virtual container
|
||||||
|
return storagedriver.FileInfoInternal{FileInfoFields: storagedriver.FileInfoFields{
|
||||||
|
Path: path,
|
||||||
|
IsDir: true,
|
||||||
|
}}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// path is not a blob or virtual container
|
||||||
|
return nil, storagedriver.PathNotFoundError{Path: path}
|
||||||
}
|
}
|
||||||
|
|
||||||
// List returns a list of the objects that are direct descendants of the given
|
// List returns a list of the objects that are direct descendants of the given
|
||||||
// path.
|
// path.
|
||||||
func (d *Driver) List(path string) ([]string, error) {
|
func (d *Driver) List(path string) ([]string, error) {
|
||||||
|
if !storagedriver.PathRegexp.MatchString(path) && path != "/" {
|
||||||
|
return nil, storagedriver.InvalidPathError{Path: path}
|
||||||
|
}
|
||||||
|
|
||||||
if path == "/" {
|
if path == "/" {
|
||||||
path = ""
|
path = ""
|
||||||
}
|
}
|
||||||
|
@ -232,6 +304,12 @@ func (d *Driver) List(path string) ([]string, error) {
|
||||||
// Move moves an object stored at sourcePath to destPath, removing the original
|
// Move moves an object stored at sourcePath to destPath, removing the original
|
||||||
// object.
|
// object.
|
||||||
func (d *Driver) Move(sourcePath string, destPath string) error {
|
func (d *Driver) Move(sourcePath string, destPath string) error {
|
||||||
|
if !storagedriver.PathRegexp.MatchString(sourcePath) {
|
||||||
|
return storagedriver.InvalidPathError{Path: sourcePath}
|
||||||
|
} else if !storagedriver.PathRegexp.MatchString(destPath) {
|
||||||
|
return storagedriver.InvalidPathError{Path: destPath}
|
||||||
|
}
|
||||||
|
|
||||||
sourceBlobURL := d.client.GetBlobUrl(d.container, sourcePath)
|
sourceBlobURL := d.client.GetBlobUrl(d.container, sourcePath)
|
||||||
err := d.client.CopyBlob(d.container, destPath, sourceBlobURL)
|
err := d.client.CopyBlob(d.container, destPath, sourceBlobURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -246,6 +324,10 @@ func (d *Driver) Move(sourcePath string, destPath string) error {
|
||||||
|
|
||||||
// Delete recursively deletes all objects stored at "path" and its subpaths.
|
// Delete recursively deletes all objects stored at "path" and its subpaths.
|
||||||
func (d *Driver) Delete(path string) error {
|
func (d *Driver) Delete(path string) error {
|
||||||
|
if !storagedriver.PathRegexp.MatchString(path) {
|
||||||
|
return storagedriver.InvalidPathError{Path: path}
|
||||||
|
}
|
||||||
|
|
||||||
ok, err := d.client.DeleteBlobIfExists(d.container, path)
|
ok, err := d.client.DeleteBlobIfExists(d.container, path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -272,6 +354,21 @@ func (d *Driver) Delete(path string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// URLFor returns a publicly accessible URL for the blob stored at given path
|
||||||
|
// for specified duration by making use of Azure Storage Shared Access Signatures (SAS).
|
||||||
|
// See https://msdn.microsoft.com/en-us/library/azure/ee395415.aspx for more info.
|
||||||
|
func (d *driver) URLFor(path string, options map[string]interface{}) (string, error) {
|
||||||
|
expiresTime := time.Now().UTC().Add(20 * time.Minute) // default expiration
|
||||||
|
expires, ok := options["expiry"]
|
||||||
|
if ok {
|
||||||
|
t, ok := expires.(time.Time)
|
||||||
|
if ok {
|
||||||
|
expiresTime = t
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return d.client.GetBlobSASURI(d.container, path, expiresTime, "r")
|
||||||
|
}
|
||||||
|
|
||||||
// directDescendants will find direct descendants (blobs or virtual containers)
|
// directDescendants will find direct descendants (blobs or virtual containers)
|
||||||
// of from list of blob paths and will return their full paths. Elements in blobs
|
// of from list of blob paths and will return their full paths. Elements in blobs
|
||||||
// list must be prefixed with a "/" and
|
// list must be prefixed with a "/" and
|
||||||
|
@ -340,7 +437,7 @@ func is404(err error) bool {
|
||||||
return ok && e.StatusCode == 404
|
return ok && e.StatusCode == 404
|
||||||
}
|
}
|
||||||
|
|
||||||
func blockNum(b64Name string) (int, error) {
|
func fromBlockID(b64Name string) (int, error) {
|
||||||
s, err := base64.StdEncoding.DecodeString(b64Name)
|
s, err := base64.StdEncoding.DecodeString(b64Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
|
@ -350,5 +447,6 @@ func blockNum(b64Name string) (int, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func toBlockID(i int) string {
|
func toBlockID(i int) string {
|
||||||
return base64.StdEncoding.EncodeToString([]byte(strconv.Itoa(i)))
|
s := fmt.Sprintf("%010d", i) // add zero padding
|
||||||
|
return base64.StdEncoding.EncodeToString([]byte(s))
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,3 @@
|
||||||
// +build ignore
|
|
||||||
|
|
||||||
package azure
|
package azure
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
@ -59,9 +57,9 @@ func init() {
|
||||||
}
|
}
|
||||||
|
|
||||||
testsuites.RegisterInProcessSuite(azureDriverConstructor, skipCheck)
|
testsuites.RegisterInProcessSuite(azureDriverConstructor, skipCheck)
|
||||||
testsuites.RegisterIPCSuite(driverName, map[string]string{
|
// testsuites.RegisterIPCSuite(driverName, map[string]string{
|
||||||
paramAccountName: accountName,
|
// paramAccountName: accountName,
|
||||||
paramAccountKey: accountKey,
|
// paramAccountKey: accountKey,
|
||||||
paramContainer: container,
|
// paramContainer: container,
|
||||||
}, skipCheck)
|
// }, skipCheck)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue