// Package azure provides a storagedriver.StorageDriver implementation to // store blobs in Microsoft Azure Blob Storage Service. package azure import ( "bufio" "bytes" "fmt" "io" "io/ioutil" "net/http" "strings" "time" "github.com/docker/distribution/context" storagedriver "github.com/docker/distribution/registry/storage/driver" "github.com/docker/distribution/registry/storage/driver/base" "github.com/docker/distribution/registry/storage/driver/factory" azure "github.com/Azure/azure-sdk-for-go/storage" ) const driverName = "azure" const ( paramAccountName = "accountname" paramAccountKey = "accountkey" paramContainer = "container" paramRealm = "realm" maxChunkSize = 4 * 1024 * 1024 ) type driver struct { client azure.BlobStorageClient container string } type baseEmbed struct{ base.Base } // Driver is a storagedriver.StorageDriver implementation backed by // Microsoft Azure Blob Storage Service. type Driver struct{ baseEmbed } func init() { factory.Register(driverName, &azureDriverFactory{}) } type azureDriverFactory struct{} func (factory *azureDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) { return FromParameters(parameters) } // FromParameters constructs a new Driver with a given parameters map. func FromParameters(parameters map[string]interface{}) (*Driver, error) { accountName, ok := parameters[paramAccountName] if !ok || fmt.Sprint(accountName) == "" { return nil, fmt.Errorf("No %s parameter provided", paramAccountName) } accountKey, ok := parameters[paramAccountKey] if !ok || fmt.Sprint(accountKey) == "" { return nil, fmt.Errorf("No %s parameter provided", paramAccountKey) } container, ok := parameters[paramContainer] if !ok || fmt.Sprint(container) == "" { return nil, fmt.Errorf("No %s parameter provided", paramContainer) } realm, ok := parameters[paramRealm] if !ok || fmt.Sprint(realm) == "" { realm = azure.DefaultBaseURL } return New(fmt.Sprint(accountName), fmt.Sprint(accountKey), fmt.Sprint(container), fmt.Sprint(realm)) } // New constructs a new Driver with the given Azure Storage Account credentials func New(accountName, accountKey, container, realm string) (*Driver, error) { api, err := azure.NewClient(accountName, accountKey, realm, azure.DefaultAPIVersion, true) if err != nil { return nil, err } blobClient := api.GetBlobService() // Create registry container if _, err = blobClient.CreateContainerIfNotExists(container, azure.ContainerAccessTypePrivate); err != nil { return nil, err } d := &driver{ client: blobClient, container: container} return &Driver{baseEmbed: baseEmbed{Base: base.Base{StorageDriver: d}}}, nil } // Implement the storagedriver.StorageDriver interface. func (d *driver) Name() string { return driverName } // GetContent retrieves the content stored at "path" as a []byte. func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) { blob, err := d.client.GetBlob(d.container, path) if err != nil { if is404(err) { return nil, storagedriver.PathNotFoundError{Path: path} } return nil, err } defer blob.Close() return ioutil.ReadAll(blob) } // PutContent stores the []byte content at a location designated by "path". func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error { if limit := 64 * 1024 * 1024; len(contents) > limit { // max size for block blobs uploaded via single "Put Blob" return fmt.Errorf("uploading %d bytes with PutContent is not supported; limit: %d bytes", len(contents), limit) } // Historically, blobs uploaded via PutContent used to be of type AppendBlob // (https://github.com/docker/distribution/pull/1438). We can't replace // these blobs atomically via a single "Put Blob" operation without // deleting them first. Once we detect they are BlockBlob type, we can // overwrite them with an atomically "Put Blob" operation. // // While we delete the blob and create a new one, there will be a small // window of inconsistency and if the Put Blob fails, we may end up with // losing the existing data while migrating it to BlockBlob type. However, // expectation is the clients pushing will be retrying when they get an error // response. props, err := d.client.GetBlobProperties(d.container, path) if err != nil && !is404(err) { return fmt.Errorf("failed to get blob properties: %v", err) } if err == nil && props.BlobType != azure.BlobTypeBlock { if err := d.client.DeleteBlob(d.container, path, nil); err != nil { return fmt.Errorf("failed to delete legacy blob (%s): %v", props.BlobType, err) } } r := bytes.NewReader(contents) return d.client.CreateBlockBlobFromReader(d.container, path, uint64(len(contents)), r, nil) } // Reader retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { if ok, err := d.client.BlobExists(d.container, path); err != nil { return nil, err } else if !ok { return nil, storagedriver.PathNotFoundError{Path: path} } info, err := d.client.GetBlobProperties(d.container, path) if err != nil { return nil, err } size := int64(info.ContentLength) if offset >= size { return ioutil.NopCloser(bytes.NewReader(nil)), nil } bytesRange := fmt.Sprintf("%v-", offset) resp, err := d.client.GetBlobRange(d.container, path, bytesRange, nil) if err != nil { return nil, err } return resp, nil } // Writer returns a FileWriter which will store the content written to it // at the location designated by "path" after the call to Commit. func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) { blobExists, err := d.client.BlobExists(d.container, path) if err != nil { return nil, err } var size int64 if blobExists { if append { blobProperties, err := d.client.GetBlobProperties(d.container, path) if err != nil { return nil, err } size = blobProperties.ContentLength } else { err := d.client.DeleteBlob(d.container, path, nil) if err != nil { return nil, err } } } else { if append { return nil, storagedriver.PathNotFoundError{Path: path} } err := d.client.PutAppendBlob(d.container, path, nil) if err != nil { return nil, err } } return d.newWriter(path, size), nil } // Stat retrieves the FileInfo for the given path, including the current size // in bytes and the creation time. func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) { // Check if the path is a blob if ok, err := d.client.BlobExists(d.container, path); err != nil { return nil, err } else if ok { blob, err := d.client.GetBlobProperties(d.container, path) if err != nil { return nil, err } mtim, err := time.Parse(http.TimeFormat, blob.LastModified) if err != nil { return nil, err } return storagedriver.FileInfoInternal{FileInfoFields: storagedriver.FileInfoFields{ Path: path, Size: int64(blob.ContentLength), ModTime: mtim, IsDir: false, }}, nil } // Check if path is a virtual container virtContainerPath := path if !strings.HasSuffix(virtContainerPath, "/") { virtContainerPath += "/" } blobs, err := d.client.ListBlobs(d.container, azure.ListBlobsParameters{ Prefix: virtContainerPath, MaxResults: 1, }) if err != nil { return nil, err } if len(blobs.Blobs) > 0 { // path is a virtual container return storagedriver.FileInfoInternal{FileInfoFields: storagedriver.FileInfoFields{ Path: path, IsDir: true, }}, nil } // path is not a blob or virtual container return nil, storagedriver.PathNotFoundError{Path: path} } // List returns a list of the objects that are direct descendants of the given // path. func (d *driver) List(ctx context.Context, path string) ([]string, error) { if path == "/" { path = "" } blobs, err := d.listBlobs(d.container, path) if err != nil { return blobs, err } list := directDescendants(blobs, path) if path != "" && len(list) == 0 { return nil, storagedriver.PathNotFoundError{Path: path} } return list, nil } // Move moves an object stored at sourcePath to destPath, removing the original // object. func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error { sourceBlobURL := d.client.GetBlobURL(d.container, sourcePath) err := d.client.CopyBlob(d.container, destPath, sourceBlobURL) if err != nil { if is404(err) { return storagedriver.PathNotFoundError{Path: sourcePath} } return err } return d.client.DeleteBlob(d.container, sourcePath, nil) } // Delete recursively deletes all objects stored at "path" and its subpaths. func (d *driver) Delete(ctx context.Context, path string) error { ok, err := d.client.DeleteBlobIfExists(d.container, path, nil) if err != nil { return err } if ok { return nil // was a blob and deleted, return } // Not a blob, see if path is a virtual container with blobs blobs, err := d.listBlobs(d.container, path) if err != nil { return err } for _, b := range blobs { if err = d.client.DeleteBlob(d.container, b, nil); err != nil { return err } } if len(blobs) == 0 { return storagedriver.PathNotFoundError{Path: path} } return nil } // URLFor returns a publicly accessible URL for the blob stored at given path // for specified duration by making use of Azure Storage Shared Access Signatures (SAS). // See https://msdn.microsoft.com/en-us/library/azure/ee395415.aspx for more info. func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) { expiresTime := time.Now().UTC().Add(20 * time.Minute) // default expiration expires, ok := options["expiry"] if ok { t, ok := expires.(time.Time) if ok { expiresTime = t } } return d.client.GetBlobSASURI(d.container, path, expiresTime, "r") } // directDescendants will find direct descendants (blobs or virtual containers) // of from list of blob paths and will return their full paths. Elements in blobs // list must be prefixed with a "/" and // // Example: direct descendants of "/" in {"/foo", "/bar/1", "/bar/2"} is // {"/foo", "/bar"} and direct descendants of "bar" is {"/bar/1", "/bar/2"} func directDescendants(blobs []string, prefix string) []string { if !strings.HasPrefix(prefix, "/") { // add trailing '/' prefix = "/" + prefix } if !strings.HasSuffix(prefix, "/") { // containerify the path prefix += "/" } out := make(map[string]bool) for _, b := range blobs { if strings.HasPrefix(b, prefix) { rel := b[len(prefix):] c := strings.Count(rel, "/") if c == 0 { out[b] = true } else { out[prefix+rel[:strings.Index(rel, "/")]] = true } } } var keys []string for k := range out { keys = append(keys, k) } return keys } func (d *driver) listBlobs(container, virtPath string) ([]string, error) { if virtPath != "" && !strings.HasSuffix(virtPath, "/") { // containerify the path virtPath += "/" } out := []string{} marker := "" for { resp, err := d.client.ListBlobs(d.container, azure.ListBlobsParameters{ Marker: marker, Prefix: virtPath, }) if err != nil { return out, err } for _, b := range resp.Blobs { out = append(out, b.Name) } if len(resp.Blobs) == 0 || resp.NextMarker == "" { break } marker = resp.NextMarker } return out, nil } func is404(err error) bool { // handle the case when the request was a HEAD and service error could not // be parsed, such as "storage: service returned without a response body // (404 The specified blob does not exist.)" if strings.Contains(fmt.Sprintf("%v", err), "404 The specified blob does not exist") { return true } // common case statusCodeErr, ok := err.(azure.AzureStorageServiceError) return ok && statusCodeErr.StatusCode == http.StatusNotFound } type writer struct { driver *driver path string size int64 bw *bufio.Writer closed bool committed bool cancelled bool } func (d *driver) newWriter(path string, size int64) storagedriver.FileWriter { return &writer{ driver: d, path: path, size: size, bw: bufio.NewWriterSize(&blockWriter{ client: d.client, container: d.container, path: path, }, maxChunkSize), } } func (w *writer) Write(p []byte) (int, error) { if w.closed { return 0, fmt.Errorf("already closed") } else if w.committed { return 0, fmt.Errorf("already committed") } else if w.cancelled { return 0, fmt.Errorf("already cancelled") } n, err := w.bw.Write(p) w.size += int64(n) return n, err } func (w *writer) Size() int64 { return w.size } func (w *writer) Close() error { if w.closed { return fmt.Errorf("already closed") } w.closed = true return w.bw.Flush() } func (w *writer) Cancel() error { if w.closed { return fmt.Errorf("already closed") } else if w.committed { return fmt.Errorf("already committed") } w.cancelled = true return w.driver.client.DeleteBlob(w.driver.container, w.path, nil) } func (w *writer) Commit() error { if w.closed { return fmt.Errorf("already closed") } else if w.committed { return fmt.Errorf("already committed") } else if w.cancelled { return fmt.Errorf("already cancelled") } w.committed = true return w.bw.Flush() } type blockWriter struct { client azure.BlobStorageClient container string path string } func (bw *blockWriter) Write(p []byte) (int, error) { n := 0 for offset := 0; offset < len(p); offset += maxChunkSize { chunkSize := maxChunkSize if offset+chunkSize > len(p) { chunkSize = len(p) - offset } err := bw.client.AppendBlock(bw.container, bw.path, p[offset:offset+chunkSize], nil) if err != nil { return n, err } n += chunkSize } return n, nil }