diff --git a/registry/storage/driver/azure/azure.go b/registry/storage/driver/azure/azure.go index 585c8b43..7c637aa4 100644 --- a/registry/storage/driver/azure/azure.go +++ b/registry/storage/driver/azure/azure.go @@ -18,6 +18,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" ) @@ -26,6 +27,8 @@ const ( maxChunkSize = 4 * 1024 * 1024 ) +var _ storagedriver.StorageDriver = &driver{} + type driver struct { azClient *azureClient client *container.Client @@ -34,11 +37,15 @@ type driver struct { copyStatusPollDelay time.Duration } -type baseEmbed struct{ base.Base } +type baseEmbed struct { + base.Base +} // Driver is a storagedriver.StorageDriver implementation backed by // Microsoft Azure Blob Storage Service. -type Driver struct{ baseEmbed } +type Driver struct { + baseEmbed +} func init() { factory.Register(driverName, &azureDriverFactory{}) @@ -74,7 +81,12 @@ func New(params *Parameters) (*Driver, error) { copyStatusPollMaxRetry: params.CopyStatusPollMaxRetry, copyStatusPollDelay: copyStatusPollDelay, } - return &Driver{baseEmbed: baseEmbed{Base: base.Base{StorageDriver: d}}}, nil + return &Driver{ + baseEmbed: baseEmbed{ + Base: base.Base{ + StorageDriver: d, + }, + }}, nil } // Implement the storagedriver.StorageDriver interface. @@ -84,25 +96,25 @@ func (d *driver) Name() string { // GetContent retrieves the content stored at "path" as a []byte. func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) { - downloadResponse, err := d.client.NewBlobClient(d.blobName(path)).DownloadStream(ctx, nil) + // TODO(milosgajdos): should we get a RetryReader here? + resp, err := d.client.NewBlobClient(d.blobName(path)).DownloadStream(ctx, nil) if err != nil { if is404(err) { return nil, storagedriver.PathNotFoundError{Path: path} } return nil, err } - body := downloadResponse.Body - defer body.Close() - return io.ReadAll(body) + defer resp.Body.Close() + return io.ReadAll(resp.Body) } // PutContent stores the []byte content at a location designated by "path". func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error { + // TODO(milosgajdos): this check is not needed as UploadBuffer will return error if we exceed the max blockbytes limit // max size for block blobs uploaded via single "Put Blob" for version after "2016-05-31" // https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob#remarks - const limit = 256 * 1024 * 1024 - if len(contents) > limit { - return fmt.Errorf("uploading %d bytes with PutContent is not supported; limit: %d bytes", len(contents), limit) + if len(contents) > blockblob.MaxUploadBlobBytes { + return fmt.Errorf("content size exceeds max allowed limit (%d): %d", blockblob.MaxUploadBlobBytes, len(contents)) } // Historically, blobs uploaded via PutContent used to be of type AppendBlob @@ -128,6 +140,7 @@ func (d *driver) PutContent(ctx context.Context, path string, contents []byte) e } } + // TODO(milosgajdos): should we set some concurrency options on UploadBuffer _, err = d.client.NewBlockBlobClient(blobName).UploadBuffer(ctx, contents, nil) return err } @@ -149,13 +162,14 @@ func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.Read return nil, fmt.Errorf("failed to get blob properties: %v", err) } if props.ContentLength == nil { - return nil, fmt.Errorf("failed to get ContentLength for path: %s", path) + return nil, fmt.Errorf("missing ContentLength: %s", path) } size := *props.ContentLength if offset >= size { return io.NopCloser(bytes.NewReader(nil)), nil } + // TODO(milosgajdos): should we get a RetryReader here? resp, err := blobRef.DownloadStream(ctx, &options) if err != nil { if is404(err) { @@ -168,7 +182,7 @@ func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.Read // Writer returns a FileWriter which will store the content written to it // at the location designated by "path" after the call to Commit. -func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) { +func (d *driver) Writer(ctx context.Context, path string, appendMode bool) (storagedriver.FileWriter, error) { blobName := d.blobName(path) blobRef := d.client.NewBlobClient(blobName) @@ -183,9 +197,9 @@ func (d *driver) Writer(ctx context.Context, path string, append bool) (storaged var size int64 if blobExists { - if append { + if appendMode { if props.ContentLength == nil { - return nil, fmt.Errorf("cannot append to blob because no ContentLength property was returned for: %s", blobName) + return nil, fmt.Errorf("missing ContentLength: %s", blobName) } size = *props.ContentLength } else { @@ -194,7 +208,7 @@ func (d *driver) Writer(ctx context.Context, path string, append bool) (storaged } } } else { - if append { + if appendMode { return nil, storagedriver.PathNotFoundError{Path: path} } if _, err = d.client.NewAppendBlobClient(blobName).Create(ctx, nil); err != nil { @@ -225,14 +239,15 @@ func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, } if len(missing) > 0 { - return nil, fmt.Errorf("required blob properties %s are missing for blob: %s", missing, blobName) + return nil, fmt.Errorf("missing required prroperties (%s) for blob: %s", missing, blobName) } - return storagedriver.FileInfoInternal{FileInfoFields: storagedriver.FileInfoFields{ - Path: path, - Size: *props.ContentLength, - ModTime: *props.LastModified, - IsDir: false, - }}, nil + return storagedriver.FileInfoInternal{ + FileInfoFields: storagedriver.FileInfoFields{ + Path: path, + Size: *props.ContentLength, + ModTime: *props.LastModified, + IsDir: false, + }}, nil } // Check if path is a virtual container @@ -253,10 +268,11 @@ func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, } if len(resp.Segment.BlobItems) > 0 { // path is a virtual container - return storagedriver.FileInfoInternal{FileInfoFields: storagedriver.FileInfoFields{ - Path: path, - IsDir: true, - }}, nil + return storagedriver.FileInfoInternal{ + FileInfoFields: storagedriver.FileInfoFields{ + Path: path, + IsDir: true, + }}, nil } } @@ -452,10 +468,9 @@ func (d *driver) listBlobs(ctx context.Context, virtPath string) ([]string, erro } for _, blob := range resp.Segment.BlobItems { if blob.Name == nil { - return nil, fmt.Errorf("required blob property Name is missing while listing blobs under: %s", listPrefix) + return nil, fmt.Errorf("missing blob Name when listing prefix: %s", listPrefix) } - name := *blob.Name - out = append(out, strings.Replace(name, blobPrefix, prefix, 1)) + out = append(out, strings.Replace(*blob.Name, blobPrefix, prefix, 1)) } } @@ -473,6 +488,7 @@ func (d *driver) blobName(path string) string { return strings.TrimLeft(strings.TrimRight(d.rootDirectory, "/")+path, "/") } +// TODO(milosgajdos): consider renaming this func func is404(err error) bool { return bloberror.HasCode( err, @@ -483,6 +499,8 @@ func is404(err error) bool { ) } +var _ storagedriver.FileWriter = &writer{} + type writer struct { driver *driver path string @@ -498,6 +516,7 @@ func (d *driver) newWriter(ctx context.Context, path string, size int64) storage driver: d, path: path, size: size, + // TODO(milosgajdos): I'm not sure about the maxChunkSize bw: bufio.NewWriterSize(&blockWriter{ ctx: ctx, client: d.client,