diff --git a/docs/configuration.md b/docs/configuration.md index 35c32656..0580cec7 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -103,6 +103,8 @@ storage: clientid: client_id_string tenantid: tenant_id_string secret: secret_string + copy_status_poll_max_retry: 10 + copy_status_poll_delay: 100ms gcs: bucket: bucketname keyfile: /path/to/keyfile diff --git a/docs/storage-drivers/azure.md b/docs/storage-drivers/azure.md index 01826e61..50f03a2b 100644 --- a/docs/storage-drivers/azure.md +++ b/docs/storage-drivers/azure.md @@ -8,12 +8,14 @@ An implementation of the `storagedriver.StorageDriver` interface which uses [Mic ## Parameters -| Parameter | Required | Description | -|:--------------|:---------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `accountname` | yes | Name of the Azure Storage Account. | -| `accountkey` | yes | Primary or Secondary Key for the Storage Account. | -| `container` | yes | Name of the Azure root storage container in which all registry data is stored. Must comply the storage container name [requirements](https://docs.microsoft.com/rest/api/storageservices/fileservices/naming-and-referencing-containers--blobs--and-metadata). For example, if your url is `https://myaccount.blob.core.windows.net/myblob` use the container value of `myblob`.| -| `realm` | no | Domain name suffix for the Storage Service API endpoint. For example realm for "Azure in China" would be `core.chinacloudapi.cn` and realm for "Azure Government" would be `core.usgovcloudapi.net`. By default, this is `core.windows.net`. | +| Parameter | Required | Description | +|:-----------------------------------|:---------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `accountname` | yes | Name of the Azure Storage Account. | +| `accountkey` | yes | Primary or Secondary Key for the Storage Account. | +| `container` | yes | Name of the Azure root storage container in which all registry data is stored. Must comply the storage container name [requirements](https://docs.microsoft.com/rest/api/storageservices/fileservices/naming-and-referencing-containers--blobs--and-metadata). For example, if your url is `https://myaccount.blob.core.windows.net/myblob` use the container value of `myblob`.| +| `realm` | no | Domain name suffix for the Storage Service API endpoint. For example realm for "Azure in China" would be `core.chinacloudapi.cn` and realm for "Azure Government" would be `core.usgovcloudapi.net`. By default, this is `core.windows.net`. | +| `copy_status_poll_max_retry` | no | Max retry number for polling of copy operation status. Retries use a simple backoff algorithm where each retry number is multiplied by `copy_status_poll_delay`, and this number is used as the delay. Set to -1 to disable retries and abort if the copy does not complete immediately. Defaults to 5. | +| `copy_status_poll_delay` | no | Time to wait between retries for polling of copy operation status. This time is multiplied by N on each retry, where N is the retry number. Defaults to 100ms | ## Related information diff --git a/registry/storage/driver/azure/azure.go b/registry/storage/driver/azure/azure.go index 88aea274..f2cb1101 100644 --- a/registry/storage/driver/azure/azure.go +++ b/registry/storage/driver/azure/azure.go @@ -21,16 +21,17 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" ) -const driverName = "azure" - const ( + driverName = "azure" maxChunkSize = 4 * 1024 * 1024 ) type driver struct { - azClient *azureClient - client *container.Client - rootDirectory string + azClient *azureClient + client *container.Client + rootDirectory string + copyStatusPollMaxRetry int + copyStatusPollDelay time.Duration } type baseEmbed struct{ base.Base } @@ -59,11 +60,19 @@ func New(params *Parameters) (*Driver, error) { if err != nil { return nil, err } + + copyStatusPollDelay, err := time.ParseDuration(params.CopyStatusPollDelay) + if err != nil { + return nil, err + } + client := azClient.ContainerClient() d := &driver{ - azClient: azClient, - client: client, - rootDirectory: params.RootDirectory, + azClient: azClient, + client: client, + rootDirectory: params.RootDirectory, + copyStatusPollMaxRetry: params.CopyStatusPollMaxRetry, + copyStatusPollDelay: copyStatusPollDelay, } return &Driver{baseEmbed: baseEmbed{Base: base.Base{StorageDriver: d}}}, nil } @@ -282,7 +291,7 @@ func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) e return err } destBlobRef := d.client.NewBlockBlobClient(d.blobName(destPath)) - _, err = destBlobRef.CopyFromURL(ctx, sourceBlobURL, nil) + resp, err := destBlobRef.StartCopyFromURL(ctx, sourceBlobURL, nil) if err != nil { if is404(err) { return storagedriver.PathNotFoundError{Path: sourcePath} @@ -290,6 +299,39 @@ func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) e return err } + copyStatus := *resp.CopyStatus + + if d.copyStatusPollMaxRetry == -1 && copyStatus == blob.CopyStatusTypePending { + destBlobRef.AbortCopyFromURL(ctx, *resp.CopyID, nil) + return nil + } + + retryCount := 1 + for copyStatus == blob.CopyStatusTypePending { + props, err := destBlobRef.GetProperties(ctx, nil) + if err != nil { + return err + } + + if retryCount >= d.copyStatusPollMaxRetry { + destBlobRef.AbortCopyFromURL(ctx, *props.CopyID, nil) + return fmt.Errorf("max retries for copy polling reached, aborting copy") + } + + copyStatus = *props.CopyStatus + if copyStatus == blob.CopyStatusTypeAborted || copyStatus == blob.CopyStatusTypeFailed { + if props.CopyStatusDescription != nil { + return fmt.Errorf("failed to move blob: %s", *props.CopyStatusDescription) + } + return fmt.Errorf("failed to move blob with copy id %s", *props.CopyID) + } + + if copyStatus == blob.CopyStatusTypePending { + time.Sleep(d.copyStatusPollDelay * time.Duration(retryCount)) + } + retryCount++ + } + _, err = d.client.NewBlobClient(d.blobName(sourcePath)).Delete(ctx, nil) return err } diff --git a/registry/storage/driver/azure/azure_test.go b/registry/storage/driver/azure/azure_test.go index 42a8a9b4..5fbd01ad 100644 --- a/registry/storage/driver/azure/azure_test.go +++ b/registry/storage/driver/azure/azure_test.go @@ -1,7 +1,9 @@ package azure import ( + "context" "fmt" + "math/rand" "os" "strings" "testing" @@ -19,6 +21,8 @@ const ( envRootDirectory = "AZURE_ROOT_DIRECTORY" ) +var azureDriverConstructor func() (storagedriver.StorageDriver, error) + // Hook up gocheck into the "go test" runner. func Test(t *testing.T) { check.TestingT(t) } @@ -36,10 +40,10 @@ func init() { value *string missingOk bool }{ - {envAccountName, &accountName, false}, - {envAccountKey, &accountKey, false}, - {envContainer, &container, false}, - {envRealm, &realm, false}, + {envAccountName, &accountName, true}, + {envAccountKey, &accountKey, true}, + {envContainer, &container, true}, + {envRealm, &realm, true}, {envRootDirectory, &rootDirectory, true}, } @@ -51,7 +55,7 @@ func init() { } } - azureDriverConstructor := func() (storagedriver.StorageDriver, error) { + azureDriverConstructor = func() (storagedriver.StorageDriver, error) { parameters := map[string]interface{}{ "container": container, "accountname": accountName, @@ -77,6 +81,66 @@ func init() { testsuites.RegisterSuite(azureDriverConstructor, skipCheck) } +var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") + +func randStringRunes(n int) string { + b := make([]rune, n) + for i := range b { + b[i] = letterRunes[rand.Intn(len(letterRunes))] + } + return string(b) +} + +func TestCommitAfterMove(t *testing.T) { + driver, err := azureDriverConstructor() + if err != nil { + t.Fatalf("unexpected error creating azure driver: %v", err) + } + + contents := randStringRunes(4 * 1024 * 1024) + sourcePath := "/source/file" + destPath := "/dest/file" + ctx := context.Background() + + defer driver.Delete(ctx, sourcePath) + defer driver.Delete(ctx, destPath) + + writer, err := driver.Writer(ctx, sourcePath, false) + if err != nil { + t.Fatalf("unexpected error from driver.Writer: %v", err) + } + + _, err = writer.Write([]byte(contents)) + if err != nil { + t.Fatalf("writer.Write: unexpected error: %v", err) + } + + err = writer.Commit() + if err != nil { + t.Fatalf("writer.Commit: unexpected error: %v", err) + } + + err = writer.Close() + if err != nil { + t.Fatalf("writer.Close: unexpected error: %v", err) + } + + _, err = driver.GetContent(ctx, sourcePath) + if err != nil { + t.Fatalf("driver.GetContent(sourcePath): unexpected error: %v", err) + } + + err = driver.Move(ctx, sourcePath, destPath) + if err != nil { + t.Fatalf("driver.Move: unexpected error: %v", err) + } + + _, err = driver.GetContent(ctx, destPath) + if err != nil { + t.Fatalf("GetContent(destPath): unexpected error: %v", err) + } +} + func TestParamParsing(t *testing.T) { expectErrors := []map[string]interface{}{ {}, diff --git a/registry/storage/driver/azure/parser.go b/registry/storage/driver/azure/parser.go index 825c78d6..c463ac32 100644 --- a/registry/storage/driver/azure/parser.go +++ b/registry/storage/driver/azure/parser.go @@ -8,7 +8,9 @@ import ( ) const ( - defaultRealm = "core.windows.net" + defaultRealm = "core.windows.net" + defaultCopyStatusPollMaxRetry = 5 + defaultCopyStatusPollDelay = "100ms" ) type Credentials struct { @@ -19,14 +21,16 @@ type Credentials struct { } type Parameters struct { - Container string `mapstructure:"container"` - AccountName string `mapstructure:"accountname"` - AccountKey string `mapstructure:"accountkey"` - Credentials Credentials `mapstructure:"credentials"` - ConnectionString string `mapstructure:"connectionstring"` - Realm string `mapstructure:"realm"` - RootDirectory string `mapstructure:"rootdirectory"` - ServiceURL string `mapstructure:"serviceurl"` + Container string `mapstructure:"container"` + AccountName string `mapstructure:"accountname"` + AccountKey string `mapstructure:"accountkey"` + Credentials Credentials `mapstructure:"credentials"` + ConnectionString string `mapstructure:"connectionstring"` + Realm string `mapstructure:"realm"` + RootDirectory string `mapstructure:"rootdirectory"` + ServiceURL string `mapstructure:"serviceurl"` + CopyStatusPollMaxRetry int `mapstructure:"copy_status_poll_max_retry"` + CopyStatusPollDelay string `mapstructure:"copy_status_poll_delay"` } func NewParameters(parameters map[string]interface{}) (*Parameters, error) { @@ -45,5 +49,11 @@ func NewParameters(parameters map[string]interface{}) (*Parameters, error) { if params.ServiceURL == "" { params.ServiceURL = fmt.Sprintf("https://%s.blob.%s", params.AccountName, params.Realm) } + if params.CopyStatusPollMaxRetry == 0 { + params.CopyStatusPollMaxRetry = defaultCopyStatusPollMaxRetry + } + if params.CopyStatusPollDelay == "" { + params.CopyStatusPollDelay = defaultCopyStatusPollDelay + } return ¶ms, nil }