diff --git a/backend/azureblob/azureblob.go b/backend/azureblob/azureblob.go index 9f71653fa..8938e4184 100644 --- a/backend/azureblob/azureblob.go +++ b/backend/azureblob/azureblob.go @@ -22,12 +22,14 @@ import ( "sync" "time" + "github.com/Azure/azure-pipeline-go/pipeline" "github.com/Azure/azure-storage-blob-go/azblob" "github.com/ncw/rclone/fs" "github.com/ncw/rclone/fs/accounting" "github.com/ncw/rclone/fs/config/configmap" "github.com/ncw/rclone/fs/config/configstruct" "github.com/ncw/rclone/fs/fserrors" + "github.com/ncw/rclone/fs/fshttp" "github.com/ncw/rclone/fs/hash" "github.com/ncw/rclone/fs/walk" "github.com/ncw/rclone/lib/pacer" @@ -135,6 +137,7 @@ type Fs struct { root string // the path we are working on if any opt Options // parsed config options features *fs.Features // optional features + client *http.Client // http client we are using svcURL *azblob.ServiceURL // reference to serviceURL cntURL *azblob.ContainerURL // reference to containerURL container string // the container we are working on @@ -272,6 +275,38 @@ func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) { return } +// httpClientFactory creates a Factory object that sends HTTP requests +// to a rclone's http.Client. +// +// copied from azblob.newDefaultHTTPClientFactory +func httpClientFactory(client *http.Client) pipeline.Factory { + return pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc { + return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) { + r, err := client.Do(request.WithContext(ctx)) + if err != nil { + err = pipeline.NewError(err, "HTTP request failed") + } + return pipeline.NewHTTPResponse(r), err + } + }) +} + +// newPipeline creates a Pipeline using the specified credentials and options. +// +// this code was copied from azblob.NewPipeline +func (f *Fs) newPipeline(c azblob.Credential, o azblob.PipelineOptions) pipeline.Pipeline { + // Closest to API goes first; closest to the wire goes last + factories := []pipeline.Factory{ + azblob.NewTelemetryPolicyFactory(o.Telemetry), + azblob.NewUniqueRequestIDPolicyFactory(), + azblob.NewRetryPolicyFactory(o.Retry), + c, + pipeline.MethodFactoryMarker(), // indicates at what stage in the pipeline the method factory is invoked + azblob.NewRequestLogPolicyFactory(o.RequestLog), + } + return pipeline.NewPipeline(factories, pipeline.Options{HTTPSender: httpClientFactory(f.client), Log: o.Log}) +} + // NewFs contstructs an Fs from the path, container:path func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { // Parse config into Options struct @@ -307,6 +342,23 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { string(azblob.AccessTierHot), string(azblob.AccessTierCool), string(azblob.AccessTierArchive)) } + f := &Fs{ + name: name, + opt: *opt, + container: container, + root: directory, + pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant), + uploadToken: pacer.NewTokenDispenser(fs.Config.Transfers), + client: fshttp.NewClient(fs.Config), + } + f.features = (&fs.Features{ + ReadMimeType: true, + WriteMimeType: true, + BucketBased: true, + SetTier: true, + GetTier: true, + }).Fill(f) + var ( u *url.URL serviceURL azblob.ServiceURL @@ -323,7 +375,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { if err != nil { return nil, errors.Wrap(err, "failed to make azure storage url from account and endpoint") } - pipeline := azblob.NewPipeline(credential, azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: maxTryTimeout}}) + pipeline := f.newPipeline(credential, azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: maxTryTimeout}}) serviceURL = azblob.NewServiceURL(*u, pipeline) containerURL = serviceURL.NewContainerURL(container) case opt.SASURL != "": @@ -332,7 +384,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { return nil, errors.Wrapf(err, "failed to parse SAS URL") } // use anonymous credentials in case of sas url - pipeline := azblob.NewPipeline(azblob.NewAnonymousCredential(), azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: maxTryTimeout}}) + pipeline := f.newPipeline(azblob.NewAnonymousCredential(), azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: maxTryTimeout}}) // Check if we have container level SAS or account level sas parts := azblob.NewBlobURLParts(*u) if parts.ContainerName != "" { @@ -349,24 +401,9 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { default: return nil, errors.New("Need account+key or connectionString or sasURL") } + f.svcURL = &serviceURL + f.cntURL = &containerURL - f := &Fs{ - name: name, - opt: *opt, - container: container, - root: directory, - svcURL: &serviceURL, - cntURL: &containerURL, - pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant), - uploadToken: pacer.NewTokenDispenser(fs.Config.Transfers), - } - f.features = (&fs.Features{ - ReadMimeType: true, - WriteMimeType: true, - BucketBased: true, - SetTier: true, - GetTier: true, - }).Fill(f) if f.root != "" { f.root += "/" // Check to see if the (container,directory) is actually an existing file