azure: fix race condition in PutContent()
See #2077 for background. The PR #1438 which was not reviewed by azure folks basically introduced a race condition around uploads to the same blob by multiple clients concurrently as it used the "writer" type for PutContent(), introduced in #1438. This does chunked upload of blobs using "AppendBlob" type, which was not atomic. Usage of "writer" type and thus AppendBlobs on metadata files is currently not concurrency-safe and generally, they are not the right type of blob for the job. This patch fixes PutContent() to use the atomic upload operation that works for uploads smaller than 64 MB and creates blobs with "BlockBlob" type. To be backwards compatible, we query the type of the blob first and if it is not a "BlockBlob" we delete the blob first before doing an atomic PUT. This creates a small inconsistency/race window "only once". Once the blob is made "BlockBlob", it is overwritten with a single PUT atomicallly next time. Therefore, going forward, PutContent() will be producing BlockBlobs and it will silently migrate the AppendBlobs introduced in #1438 to BlockBlobs with this patch. Tested with existing code side by side, both registries with and without this patch work fine without breaking each other. So this should be good from a backwards/forward compatiblity perspective, with a cost of doing an extra HEAD checking the blob type. Fixes #2077. Signed-off-by: Ahmet Alp Balkan <ahmetalpbalkan@gmail.com>
This commit is contained in:
parent
c599955707
commit
78d0660319
1 changed files with 32 additions and 10 deletions
|
@ -117,19 +117,33 @@ func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
|
||||||
|
|
||||||
// PutContent stores the []byte content at a location designated by "path".
|
// PutContent stores the []byte content at a location designated by "path".
|
||||||
func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
|
func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
|
||||||
if _, err := d.client.DeleteBlobIfExists(d.container, path, nil); err != nil {
|
if limit := 64 * 1024 * 1024; len(contents) > limit { // max size for block blobs uploaded via single "Put Blob"
|
||||||
return err
|
return fmt.Errorf("uploading %d bytes with PutContent is not supported; limit: %d bytes", len(contents), limit)
|
||||||
}
|
}
|
||||||
writer, err := d.Writer(ctx, path, false)
|
|
||||||
if err != nil {
|
// Historically, blobs uploaded via PutContent used to be of type AppendBlob
|
||||||
return err
|
// (https://github.com/docker/distribution/pull/1438). We can't replace
|
||||||
|
// these blobs atomically via a single "Put Blob" operation without
|
||||||
|
// deleting them first. Once we detect they are BlockBlob type, we can
|
||||||
|
// overwrite them with an atomically "Put Blob" operation.
|
||||||
|
//
|
||||||
|
// While we delete the blob and create a new one, there will be a small
|
||||||
|
// window of inconsistency and if the Put Blob fails, we may end up with
|
||||||
|
// losing the existing data while migrating it to BlockBlob type. However,
|
||||||
|
// expectation is the clients pushing will be retrying when they get an error
|
||||||
|
// response.
|
||||||
|
props, err := d.client.GetBlobProperties(d.container, path)
|
||||||
|
if err != nil && !is404(err) {
|
||||||
|
return fmt.Errorf("failed to get blob properties: %v", err)
|
||||||
}
|
}
|
||||||
defer writer.Close()
|
if err == nil && props.BlobType != azure.BlobTypeBlock {
|
||||||
_, err = writer.Write(contents)
|
if err := d.client.DeleteBlob(d.container, path, nil); err != nil {
|
||||||
if err != nil {
|
return fmt.Errorf("failed to delete legacy blob (%s): %v", props.BlobType, err)
|
||||||
return err
|
}
|
||||||
}
|
}
|
||||||
return writer.Commit()
|
|
||||||
|
r := bytes.NewReader(contents)
|
||||||
|
return d.client.CreateBlockBlobFromReader(d.container, path, uint64(len(contents)), r, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reader retrieves an io.ReadCloser for the content stored at "path" with a
|
// Reader retrieves an io.ReadCloser for the content stored at "path" with a
|
||||||
|
@ -383,6 +397,14 @@ func (d *driver) listBlobs(container, virtPath string) ([]string, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func is404(err error) bool {
|
func is404(err error) bool {
|
||||||
|
// handle the case when the request was a HEAD and service error could not
|
||||||
|
// be parsed, such as "storage: service returned without a response body
|
||||||
|
// (404 The specified blob does not exist.)"
|
||||||
|
if strings.Contains(fmt.Sprintf("%v", err), "404 The specified blob does not exist") {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// common case
|
||||||
statusCodeErr, ok := err.(azure.AzureStorageServiceError)
|
statusCodeErr, ok := err.(azure.AzureStorageServiceError)
|
||||||
return ok && statusCodeErr.StatusCode == http.StatusNotFound
|
return ok && statusCodeErr.StatusCode == http.StatusNotFound
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue