distribution/registry/storage/driver/azure/azure.go
Ahmet Alp Balkan 78d0660319
azure: fix race condition in PutContent()
See #2077 for background.

The PR #1438 which was not reviewed by azure folks basically introduced
a race condition around uploads to the same blob by multiple clients
concurrently as it used the "writer" type for PutContent(), introduced in #1438.
This does chunked upload of blobs using "AppendBlob" type, which was not atomic.

Usage of "writer" type and thus AppendBlobs on metadata files is currently not
concurrency-safe and generally, they are not the right type of blob for the job.

This patch fixes PutContent() to use the atomic upload operation that works
for uploads smaller than 64 MB and creates blobs with "BlockBlob" type. To be
backwards compatible, we query the type of the blob first and if it is not
a "BlockBlob" we delete the blob first before doing an atomic PUT. This
creates a small inconsistency/race window "only once". Once the blob is made
"BlockBlob", it is overwritten with a single PUT atomicallly next time.

Therefore, going forward, PutContent() will be producing BlockBlobs and it
will silently migrate the AppendBlobs introduced in #1438 to BlockBlobs with
this patch.

Tested with existing code side by side, both registries with and without this
patch work fine without breaking each other. So this should be good from a
backwards/forward compatiblity perspective, with a cost of doing an extra
HEAD checking the blob type.

Fixes #2077.

Signed-off-by: Ahmet Alp Balkan <ahmetalpbalkan@gmail.com>
2016-11-30 12:40:43 -08:00

505 lines
14 KiB
Go

// Package azure provides a storagedriver.StorageDriver implementation to
// store blobs in Microsoft Azure Blob Storage Service.
package azure
import (
"bufio"
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"strings"
"time"
"github.com/docker/distribution/context"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/base"
"github.com/docker/distribution/registry/storage/driver/factory"
azure "github.com/Azure/azure-sdk-for-go/storage"
)
const driverName = "azure"
const (
paramAccountName = "accountname"
paramAccountKey = "accountkey"
paramContainer = "container"
paramRealm = "realm"
maxChunkSize = 4 * 1024 * 1024
)
type driver struct {
client azure.BlobStorageClient
container string
}
type baseEmbed struct{ base.Base }
// Driver is a storagedriver.StorageDriver implementation backed by
// Microsoft Azure Blob Storage Service.
type Driver struct{ baseEmbed }
func init() {
factory.Register(driverName, &azureDriverFactory{})
}
type azureDriverFactory struct{}
func (factory *azureDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
return FromParameters(parameters)
}
// FromParameters constructs a new Driver with a given parameters map.
func FromParameters(parameters map[string]interface{}) (*Driver, error) {
accountName, ok := parameters[paramAccountName]
if !ok || fmt.Sprint(accountName) == "" {
return nil, fmt.Errorf("No %s parameter provided", paramAccountName)
}
accountKey, ok := parameters[paramAccountKey]
if !ok || fmt.Sprint(accountKey) == "" {
return nil, fmt.Errorf("No %s parameter provided", paramAccountKey)
}
container, ok := parameters[paramContainer]
if !ok || fmt.Sprint(container) == "" {
return nil, fmt.Errorf("No %s parameter provided", paramContainer)
}
realm, ok := parameters[paramRealm]
if !ok || fmt.Sprint(realm) == "" {
realm = azure.DefaultBaseURL
}
return New(fmt.Sprint(accountName), fmt.Sprint(accountKey), fmt.Sprint(container), fmt.Sprint(realm))
}
// New constructs a new Driver with the given Azure Storage Account credentials
func New(accountName, accountKey, container, realm string) (*Driver, error) {
api, err := azure.NewClient(accountName, accountKey, realm, azure.DefaultAPIVersion, true)
if err != nil {
return nil, err
}
blobClient := api.GetBlobService()
// Create registry container
if _, err = blobClient.CreateContainerIfNotExists(container, azure.ContainerAccessTypePrivate); err != nil {
return nil, err
}
d := &driver{
client: blobClient,
container: container}
return &Driver{baseEmbed: baseEmbed{Base: base.Base{StorageDriver: d}}}, nil
}
// Implement the storagedriver.StorageDriver interface.
func (d *driver) Name() string {
return driverName
}
// GetContent retrieves the content stored at "path" as a []byte.
func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
blob, err := d.client.GetBlob(d.container, path)
if err != nil {
if is404(err) {
return nil, storagedriver.PathNotFoundError{Path: path}
}
return nil, err
}
defer blob.Close()
return ioutil.ReadAll(blob)
}
// PutContent stores the []byte content at a location designated by "path".
func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
if limit := 64 * 1024 * 1024; len(contents) > limit { // max size for block blobs uploaded via single "Put Blob"
return fmt.Errorf("uploading %d bytes with PutContent is not supported; limit: %d bytes", len(contents), limit)
}
// Historically, blobs uploaded via PutContent used to be of type AppendBlob
// (https://github.com/docker/distribution/pull/1438). We can't replace
// these blobs atomically via a single "Put Blob" operation without
// deleting them first. Once we detect they are BlockBlob type, we can
// overwrite them with an atomically "Put Blob" operation.
//
// While we delete the blob and create a new one, there will be a small
// window of inconsistency and if the Put Blob fails, we may end up with
// losing the existing data while migrating it to BlockBlob type. However,
// expectation is the clients pushing will be retrying when they get an error
// response.
props, err := d.client.GetBlobProperties(d.container, path)
if err != nil && !is404(err) {
return fmt.Errorf("failed to get blob properties: %v", err)
}
if err == nil && props.BlobType != azure.BlobTypeBlock {
if err := d.client.DeleteBlob(d.container, path, nil); err != nil {
return fmt.Errorf("failed to delete legacy blob (%s): %v", props.BlobType, err)
}
}
r := bytes.NewReader(contents)
return d.client.CreateBlockBlobFromReader(d.container, path, uint64(len(contents)), r, nil)
}
// Reader retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset.
func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
if ok, err := d.client.BlobExists(d.container, path); err != nil {
return nil, err
} else if !ok {
return nil, storagedriver.PathNotFoundError{Path: path}
}
info, err := d.client.GetBlobProperties(d.container, path)
if err != nil {
return nil, err
}
size := int64(info.ContentLength)
if offset >= size {
return ioutil.NopCloser(bytes.NewReader(nil)), nil
}
bytesRange := fmt.Sprintf("%v-", offset)
resp, err := d.client.GetBlobRange(d.container, path, bytesRange, nil)
if err != nil {
return nil, err
}
return resp, nil
}
// Writer returns a FileWriter which will store the content written to it
// at the location designated by "path" after the call to Commit.
func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
blobExists, err := d.client.BlobExists(d.container, path)
if err != nil {
return nil, err
}
var size int64
if blobExists {
if append {
blobProperties, err := d.client.GetBlobProperties(d.container, path)
if err != nil {
return nil, err
}
size = blobProperties.ContentLength
} else {
err := d.client.DeleteBlob(d.container, path, nil)
if err != nil {
return nil, err
}
}
} else {
if append {
return nil, storagedriver.PathNotFoundError{Path: path}
}
err := d.client.PutAppendBlob(d.container, path, nil)
if err != nil {
return nil, err
}
}
return d.newWriter(path, size), nil
}
// Stat retrieves the FileInfo for the given path, including the current size
// in bytes and the creation time.
func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
// Check if the path is a blob
if ok, err := d.client.BlobExists(d.container, path); err != nil {
return nil, err
} else if ok {
blob, err := d.client.GetBlobProperties(d.container, path)
if err != nil {
return nil, err
}
mtim, err := time.Parse(http.TimeFormat, blob.LastModified)
if err != nil {
return nil, err
}
return storagedriver.FileInfoInternal{FileInfoFields: storagedriver.FileInfoFields{
Path: path,
Size: int64(blob.ContentLength),
ModTime: mtim,
IsDir: false,
}}, nil
}
// Check if path is a virtual container
virtContainerPath := path
if !strings.HasSuffix(virtContainerPath, "/") {
virtContainerPath += "/"
}
blobs, err := d.client.ListBlobs(d.container, azure.ListBlobsParameters{
Prefix: virtContainerPath,
MaxResults: 1,
})
if err != nil {
return nil, err
}
if len(blobs.Blobs) > 0 {
// path is a virtual container
return storagedriver.FileInfoInternal{FileInfoFields: storagedriver.FileInfoFields{
Path: path,
IsDir: true,
}}, nil
}
// path is not a blob or virtual container
return nil, storagedriver.PathNotFoundError{Path: path}
}
// List returns a list of the objects that are direct descendants of the given
// path.
func (d *driver) List(ctx context.Context, path string) ([]string, error) {
if path == "/" {
path = ""
}
blobs, err := d.listBlobs(d.container, path)
if err != nil {
return blobs, err
}
list := directDescendants(blobs, path)
if path != "" && len(list) == 0 {
return nil, storagedriver.PathNotFoundError{Path: path}
}
return list, nil
}
// Move moves an object stored at sourcePath to destPath, removing the original
// object.
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
sourceBlobURL := d.client.GetBlobURL(d.container, sourcePath)
err := d.client.CopyBlob(d.container, destPath, sourceBlobURL)
if err != nil {
if is404(err) {
return storagedriver.PathNotFoundError{Path: sourcePath}
}
return err
}
return d.client.DeleteBlob(d.container, sourcePath, nil)
}
// Delete recursively deletes all objects stored at "path" and its subpaths.
func (d *driver) Delete(ctx context.Context, path string) error {
ok, err := d.client.DeleteBlobIfExists(d.container, path, nil)
if err != nil {
return err
}
if ok {
return nil // was a blob and deleted, return
}
// Not a blob, see if path is a virtual container with blobs
blobs, err := d.listBlobs(d.container, path)
if err != nil {
return err
}
for _, b := range blobs {
if err = d.client.DeleteBlob(d.container, b, nil); err != nil {
return err
}
}
if len(blobs) == 0 {
return storagedriver.PathNotFoundError{Path: path}
}
return nil
}
// URLFor returns a publicly accessible URL for the blob stored at given path
// for specified duration by making use of Azure Storage Shared Access Signatures (SAS).
// See https://msdn.microsoft.com/en-us/library/azure/ee395415.aspx for more info.
func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
expiresTime := time.Now().UTC().Add(20 * time.Minute) // default expiration
expires, ok := options["expiry"]
if ok {
t, ok := expires.(time.Time)
if ok {
expiresTime = t
}
}
return d.client.GetBlobSASURI(d.container, path, expiresTime, "r")
}
// directDescendants will find direct descendants (blobs or virtual containers)
// of from list of blob paths and will return their full paths. Elements in blobs
// list must be prefixed with a "/" and
//
// Example: direct descendants of "/" in {"/foo", "/bar/1", "/bar/2"} is
// {"/foo", "/bar"} and direct descendants of "bar" is {"/bar/1", "/bar/2"}
func directDescendants(blobs []string, prefix string) []string {
if !strings.HasPrefix(prefix, "/") { // add trailing '/'
prefix = "/" + prefix
}
if !strings.HasSuffix(prefix, "/") { // containerify the path
prefix += "/"
}
out := make(map[string]bool)
for _, b := range blobs {
if strings.HasPrefix(b, prefix) {
rel := b[len(prefix):]
c := strings.Count(rel, "/")
if c == 0 {
out[b] = true
} else {
out[prefix+rel[:strings.Index(rel, "/")]] = true
}
}
}
var keys []string
for k := range out {
keys = append(keys, k)
}
return keys
}
func (d *driver) listBlobs(container, virtPath string) ([]string, error) {
if virtPath != "" && !strings.HasSuffix(virtPath, "/") { // containerify the path
virtPath += "/"
}
out := []string{}
marker := ""
for {
resp, err := d.client.ListBlobs(d.container, azure.ListBlobsParameters{
Marker: marker,
Prefix: virtPath,
})
if err != nil {
return out, err
}
for _, b := range resp.Blobs {
out = append(out, b.Name)
}
if len(resp.Blobs) == 0 || resp.NextMarker == "" {
break
}
marker = resp.NextMarker
}
return out, nil
}
func is404(err error) bool {
// handle the case when the request was a HEAD and service error could not
// be parsed, such as "storage: service returned without a response body
// (404 The specified blob does not exist.)"
if strings.Contains(fmt.Sprintf("%v", err), "404 The specified blob does not exist") {
return true
}
// common case
statusCodeErr, ok := err.(azure.AzureStorageServiceError)
return ok && statusCodeErr.StatusCode == http.StatusNotFound
}
type writer struct {
driver *driver
path string
size int64
bw *bufio.Writer
closed bool
committed bool
cancelled bool
}
func (d *driver) newWriter(path string, size int64) storagedriver.FileWriter {
return &writer{
driver: d,
path: path,
size: size,
bw: bufio.NewWriterSize(&blockWriter{
client: d.client,
container: d.container,
path: path,
}, maxChunkSize),
}
}
func (w *writer) Write(p []byte) (int, error) {
if w.closed {
return 0, fmt.Errorf("already closed")
} else if w.committed {
return 0, fmt.Errorf("already committed")
} else if w.cancelled {
return 0, fmt.Errorf("already cancelled")
}
n, err := w.bw.Write(p)
w.size += int64(n)
return n, err
}
func (w *writer) Size() int64 {
return w.size
}
func (w *writer) Close() error {
if w.closed {
return fmt.Errorf("already closed")
}
w.closed = true
return w.bw.Flush()
}
func (w *writer) Cancel() error {
if w.closed {
return fmt.Errorf("already closed")
} else if w.committed {
return fmt.Errorf("already committed")
}
w.cancelled = true
return w.driver.client.DeleteBlob(w.driver.container, w.path, nil)
}
func (w *writer) Commit() error {
if w.closed {
return fmt.Errorf("already closed")
} else if w.committed {
return fmt.Errorf("already committed")
} else if w.cancelled {
return fmt.Errorf("already cancelled")
}
w.committed = true
return w.bw.Flush()
}
type blockWriter struct {
client azure.BlobStorageClient
container string
path string
}
func (bw *blockWriter) Write(p []byte) (int, error) {
n := 0
for offset := 0; offset < len(p); offset += maxChunkSize {
chunkSize := maxChunkSize
if offset+chunkSize > len(p) {
chunkSize = len(p) - offset
}
err := bw.client.AppendBlock(bw.container, bw.path, p[offset:offset+chunkSize], nil)
if err != nil {
return n, err
}
n += chunkSize
}
return n, nil
}