cleanup: a small Azure driver cleanup (#4138)
This commit is contained in:
commit
d153e1dc5b
1 changed files with 48 additions and 29 deletions
|
@ -18,6 +18,7 @@ import (
|
||||||
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
|
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
|
||||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
|
||||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
|
||||||
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
|
||||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -26,6 +27,8 @@ const (
|
||||||
maxChunkSize = 4 * 1024 * 1024
|
maxChunkSize = 4 * 1024 * 1024
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var _ storagedriver.StorageDriver = &driver{}
|
||||||
|
|
||||||
type driver struct {
|
type driver struct {
|
||||||
azClient *azureClient
|
azClient *azureClient
|
||||||
client *container.Client
|
client *container.Client
|
||||||
|
@ -34,11 +37,15 @@ type driver struct {
|
||||||
copyStatusPollDelay time.Duration
|
copyStatusPollDelay time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
type baseEmbed struct{ base.Base }
|
type baseEmbed struct {
|
||||||
|
base.Base
|
||||||
|
}
|
||||||
|
|
||||||
// Driver is a storagedriver.StorageDriver implementation backed by
|
// Driver is a storagedriver.StorageDriver implementation backed by
|
||||||
// Microsoft Azure Blob Storage Service.
|
// Microsoft Azure Blob Storage Service.
|
||||||
type Driver struct{ baseEmbed }
|
type Driver struct {
|
||||||
|
baseEmbed
|
||||||
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
factory.Register(driverName, &azureDriverFactory{})
|
factory.Register(driverName, &azureDriverFactory{})
|
||||||
|
@ -74,7 +81,12 @@ func New(ctx context.Context, params *Parameters) (*Driver, error) {
|
||||||
copyStatusPollMaxRetry: params.CopyStatusPollMaxRetry,
|
copyStatusPollMaxRetry: params.CopyStatusPollMaxRetry,
|
||||||
copyStatusPollDelay: copyStatusPollDelay,
|
copyStatusPollDelay: copyStatusPollDelay,
|
||||||
}
|
}
|
||||||
return &Driver{baseEmbed: baseEmbed{Base: base.Base{StorageDriver: d}}}, nil
|
return &Driver{
|
||||||
|
baseEmbed: baseEmbed{
|
||||||
|
Base: base.Base{
|
||||||
|
StorageDriver: d,
|
||||||
|
},
|
||||||
|
}}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implement the storagedriver.StorageDriver interface.
|
// Implement the storagedriver.StorageDriver interface.
|
||||||
|
@ -84,25 +96,25 @@ func (d *driver) Name() string {
|
||||||
|
|
||||||
// GetContent retrieves the content stored at "path" as a []byte.
|
// GetContent retrieves the content stored at "path" as a []byte.
|
||||||
func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
|
func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
|
||||||
downloadResponse, err := d.client.NewBlobClient(d.blobName(path)).DownloadStream(ctx, nil)
|
// TODO(milosgajdos): should we get a RetryReader here?
|
||||||
|
resp, err := d.client.NewBlobClient(d.blobName(path)).DownloadStream(ctx, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if is404(err) {
|
if is404(err) {
|
||||||
return nil, storagedriver.PathNotFoundError{Path: path}
|
return nil, storagedriver.PathNotFoundError{Path: path}
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
body := downloadResponse.Body
|
defer resp.Body.Close()
|
||||||
defer body.Close()
|
return io.ReadAll(resp.Body)
|
||||||
return io.ReadAll(body)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutContent stores the []byte content at a location designated by "path".
|
// PutContent stores the []byte content at a location designated by "path".
|
||||||
func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
|
func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
|
||||||
|
// TODO(milosgajdos): this check is not needed as UploadBuffer will return error if we exceed the max blockbytes limit
|
||||||
// max size for block blobs uploaded via single "Put Blob" for version after "2016-05-31"
|
// max size for block blobs uploaded via single "Put Blob" for version after "2016-05-31"
|
||||||
// https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob#remarks
|
// https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob#remarks
|
||||||
const limit = 256 * 1024 * 1024
|
if len(contents) > blockblob.MaxUploadBlobBytes {
|
||||||
if len(contents) > limit {
|
return fmt.Errorf("content size exceeds max allowed limit (%d): %d", blockblob.MaxUploadBlobBytes, len(contents))
|
||||||
return fmt.Errorf("uploading %d bytes with PutContent is not supported; limit: %d bytes", len(contents), limit)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Historically, blobs uploaded via PutContent used to be of type AppendBlob
|
// Historically, blobs uploaded via PutContent used to be of type AppendBlob
|
||||||
|
@ -128,6 +140,7 @@ func (d *driver) PutContent(ctx context.Context, path string, contents []byte) e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(milosgajdos): should we set some concurrency options on UploadBuffer
|
||||||
_, err = d.client.NewBlockBlobClient(blobName).UploadBuffer(ctx, contents, nil)
|
_, err = d.client.NewBlockBlobClient(blobName).UploadBuffer(ctx, contents, nil)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -149,13 +162,14 @@ func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.Read
|
||||||
return nil, fmt.Errorf("failed to get blob properties: %v", err)
|
return nil, fmt.Errorf("failed to get blob properties: %v", err)
|
||||||
}
|
}
|
||||||
if props.ContentLength == nil {
|
if props.ContentLength == nil {
|
||||||
return nil, fmt.Errorf("failed to get ContentLength for path: %s", path)
|
return nil, fmt.Errorf("missing ContentLength: %s", path)
|
||||||
}
|
}
|
||||||
size := *props.ContentLength
|
size := *props.ContentLength
|
||||||
if offset >= size {
|
if offset >= size {
|
||||||
return io.NopCloser(bytes.NewReader(nil)), nil
|
return io.NopCloser(bytes.NewReader(nil)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(milosgajdos): should we get a RetryReader here?
|
||||||
resp, err := blobRef.DownloadStream(ctx, &options)
|
resp, err := blobRef.DownloadStream(ctx, &options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if is404(err) {
|
if is404(err) {
|
||||||
|
@ -168,7 +182,7 @@ func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.Read
|
||||||
|
|
||||||
// Writer returns a FileWriter which will store the content written to it
|
// Writer returns a FileWriter which will store the content written to it
|
||||||
// at the location designated by "path" after the call to Commit.
|
// at the location designated by "path" after the call to Commit.
|
||||||
func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
|
func (d *driver) Writer(ctx context.Context, path string, appendMode bool) (storagedriver.FileWriter, error) {
|
||||||
blobName := d.blobName(path)
|
blobName := d.blobName(path)
|
||||||
blobRef := d.client.NewBlobClient(blobName)
|
blobRef := d.client.NewBlobClient(blobName)
|
||||||
|
|
||||||
|
@ -183,9 +197,9 @@ func (d *driver) Writer(ctx context.Context, path string, append bool) (storaged
|
||||||
|
|
||||||
var size int64
|
var size int64
|
||||||
if blobExists {
|
if blobExists {
|
||||||
if append {
|
if appendMode {
|
||||||
if props.ContentLength == nil {
|
if props.ContentLength == nil {
|
||||||
return nil, fmt.Errorf("cannot append to blob because no ContentLength property was returned for: %s", blobName)
|
return nil, fmt.Errorf("missing ContentLength: %s", blobName)
|
||||||
}
|
}
|
||||||
size = *props.ContentLength
|
size = *props.ContentLength
|
||||||
} else {
|
} else {
|
||||||
|
@ -194,7 +208,7 @@ func (d *driver) Writer(ctx context.Context, path string, append bool) (storaged
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if append {
|
if appendMode {
|
||||||
return nil, storagedriver.PathNotFoundError{Path: path}
|
return nil, storagedriver.PathNotFoundError{Path: path}
|
||||||
}
|
}
|
||||||
if _, err = d.client.NewAppendBlobClient(blobName).Create(ctx, nil); err != nil {
|
if _, err = d.client.NewAppendBlobClient(blobName).Create(ctx, nil); err != nil {
|
||||||
|
@ -225,9 +239,10 @@ func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo,
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(missing) > 0 {
|
if len(missing) > 0 {
|
||||||
return nil, fmt.Errorf("required blob properties %s are missing for blob: %s", missing, blobName)
|
return nil, fmt.Errorf("missing required prroperties (%s) for blob: %s", missing, blobName)
|
||||||
}
|
}
|
||||||
return storagedriver.FileInfoInternal{FileInfoFields: storagedriver.FileInfoFields{
|
return storagedriver.FileInfoInternal{
|
||||||
|
FileInfoFields: storagedriver.FileInfoFields{
|
||||||
Path: path,
|
Path: path,
|
||||||
Size: *props.ContentLength,
|
Size: *props.ContentLength,
|
||||||
ModTime: *props.LastModified,
|
ModTime: *props.LastModified,
|
||||||
|
@ -253,7 +268,8 @@ func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo,
|
||||||
}
|
}
|
||||||
if len(resp.Segment.BlobItems) > 0 {
|
if len(resp.Segment.BlobItems) > 0 {
|
||||||
// path is a virtual container
|
// path is a virtual container
|
||||||
return storagedriver.FileInfoInternal{FileInfoFields: storagedriver.FileInfoFields{
|
return storagedriver.FileInfoInternal{
|
||||||
|
FileInfoFields: storagedriver.FileInfoFields{
|
||||||
Path: path,
|
Path: path,
|
||||||
IsDir: true,
|
IsDir: true,
|
||||||
}}, nil
|
}}, nil
|
||||||
|
@ -452,10 +468,9 @@ func (d *driver) listBlobs(ctx context.Context, virtPath string) ([]string, erro
|
||||||
}
|
}
|
||||||
for _, blob := range resp.Segment.BlobItems {
|
for _, blob := range resp.Segment.BlobItems {
|
||||||
if blob.Name == nil {
|
if blob.Name == nil {
|
||||||
return nil, fmt.Errorf("required blob property Name is missing while listing blobs under: %s", listPrefix)
|
return nil, fmt.Errorf("missing blob Name when listing prefix: %s", listPrefix)
|
||||||
}
|
}
|
||||||
name := *blob.Name
|
out = append(out, strings.Replace(*blob.Name, blobPrefix, prefix, 1))
|
||||||
out = append(out, strings.Replace(name, blobPrefix, prefix, 1))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -473,6 +488,7 @@ func (d *driver) blobName(path string) string {
|
||||||
return strings.TrimLeft(strings.TrimRight(d.rootDirectory, "/")+path, "/")
|
return strings.TrimLeft(strings.TrimRight(d.rootDirectory, "/")+path, "/")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(milosgajdos): consider renaming this func
|
||||||
func is404(err error) bool {
|
func is404(err error) bool {
|
||||||
return bloberror.HasCode(
|
return bloberror.HasCode(
|
||||||
err,
|
err,
|
||||||
|
@ -483,6 +499,8 @@ func is404(err error) bool {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var _ storagedriver.FileWriter = &writer{}
|
||||||
|
|
||||||
type writer struct {
|
type writer struct {
|
||||||
driver *driver
|
driver *driver
|
||||||
path string
|
path string
|
||||||
|
@ -498,6 +516,7 @@ func (d *driver) newWriter(ctx context.Context, path string, size int64) storage
|
||||||
driver: d,
|
driver: d,
|
||||||
path: path,
|
path: path,
|
||||||
size: size,
|
size: size,
|
||||||
|
// TODO(milosgajdos): I'm not sure about the maxChunkSize
|
||||||
bw: bufio.NewWriterSize(&blockWriter{
|
bw: bufio.NewWriterSize(&blockWriter{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
client: d.client,
|
client: d.client,
|
||||||
|
|
Loading…
Reference in a new issue