Merge pull request #2088 from ahmetalpbalkan/pr-upstream-azure-race-fix
azure: fix race condition in PutContent()
This commit is contained in:
commit
15dc1296af
1 changed files with 32 additions and 10 deletions
|
@ -117,19 +117,33 @@ func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
|
||||||
|
|
||||||
// PutContent stores the []byte content at a location designated by "path".
|
// PutContent stores the []byte content at a location designated by "path".
|
||||||
func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
|
func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
|
||||||
if _, err := d.client.DeleteBlobIfExists(d.container, path, nil); err != nil {
|
if limit := 64 * 1024 * 1024; len(contents) > limit { // max size for block blobs uploaded via single "Put Blob"
|
||||||
return err
|
return fmt.Errorf("uploading %d bytes with PutContent is not supported; limit: %d bytes", len(contents), limit)
|
||||||
}
|
}
|
||||||
writer, err := d.Writer(ctx, path, false)
|
|
||||||
if err != nil {
|
// Historically, blobs uploaded via PutContent used to be of type AppendBlob
|
||||||
return err
|
// (https://github.com/docker/distribution/pull/1438). We can't replace
|
||||||
|
// these blobs atomically via a single "Put Blob" operation without
|
||||||
|
// deleting them first. Once we detect they are BlockBlob type, we can
|
||||||
|
// overwrite them with an atomically "Put Blob" operation.
|
||||||
|
//
|
||||||
|
// While we delete the blob and create a new one, there will be a small
|
||||||
|
// window of inconsistency and if the Put Blob fails, we may end up with
|
||||||
|
// losing the existing data while migrating it to BlockBlob type. However,
|
||||||
|
// expectation is the clients pushing will be retrying when they get an error
|
||||||
|
// response.
|
||||||
|
props, err := d.client.GetBlobProperties(d.container, path)
|
||||||
|
if err != nil && !is404(err) {
|
||||||
|
return fmt.Errorf("failed to get blob properties: %v", err)
|
||||||
}
|
}
|
||||||
defer writer.Close()
|
if err == nil && props.BlobType != azure.BlobTypeBlock {
|
||||||
_, err = writer.Write(contents)
|
if err := d.client.DeleteBlob(d.container, path, nil); err != nil {
|
||||||
if err != nil {
|
return fmt.Errorf("failed to delete legacy blob (%s): %v", props.BlobType, err)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
return writer.Commit()
|
}
|
||||||
|
|
||||||
|
r := bytes.NewReader(contents)
|
||||||
|
return d.client.CreateBlockBlobFromReader(d.container, path, uint64(len(contents)), r, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reader retrieves an io.ReadCloser for the content stored at "path" with a
|
// Reader retrieves an io.ReadCloser for the content stored at "path" with a
|
||||||
|
@ -383,6 +397,14 @@ func (d *driver) listBlobs(container, virtPath string) ([]string, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func is404(err error) bool {
|
func is404(err error) bool {
|
||||||
|
// handle the case when the request was a HEAD and service error could not
|
||||||
|
// be parsed, such as "storage: service returned without a response body
|
||||||
|
// (404 The specified blob does not exist.)"
|
||||||
|
if strings.Contains(fmt.Sprintf("%v", err), "404 The specified blob does not exist") {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// common case
|
||||||
statusCodeErr, ok := err.(azure.AzureStorageServiceError)
|
statusCodeErr, ok := err.(azure.AzureStorageServiceError)
|
||||||
return ok && statusCodeErr.StatusCode == http.StatusNotFound
|
return ok && statusCodeErr.StatusCode == http.StatusNotFound
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue