azureblob: use the rclone HTTP client - fixes #2654

This enables --dump headers and --timeout to work properly.
This commit is contained in:
Nick Craig-Wood 2018-11-30 12:08:22 +00:00
parent 35fba5bfdd
commit 5d1d93e163

View file

@ -22,12 +22,14 @@ import (
"sync"
"time"
"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/ncw/rclone/fs"
"github.com/ncw/rclone/fs/accounting"
"github.com/ncw/rclone/fs/config/configmap"
"github.com/ncw/rclone/fs/config/configstruct"
"github.com/ncw/rclone/fs/fserrors"
"github.com/ncw/rclone/fs/fshttp"
"github.com/ncw/rclone/fs/hash"
"github.com/ncw/rclone/fs/walk"
"github.com/ncw/rclone/lib/pacer"
@ -135,6 +137,7 @@ type Fs struct {
root string // the path we are working on if any
opt Options // parsed config options
features *fs.Features // optional features
client *http.Client // http client we are using
svcURL *azblob.ServiceURL // reference to serviceURL
cntURL *azblob.ContainerURL // reference to containerURL
container string // the container we are working on
@ -272,6 +275,38 @@ func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) {
return
}
// httpClientFactory creates a Factory object that sends HTTP requests
// to a rclone's http.Client.
//
// copied from azblob.newDefaultHTTPClientFactory
func httpClientFactory(client *http.Client) pipeline.Factory {
return pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
r, err := client.Do(request.WithContext(ctx))
if err != nil {
err = pipeline.NewError(err, "HTTP request failed")
}
return pipeline.NewHTTPResponse(r), err
}
})
}
// newPipeline creates a Pipeline using the specified credentials and options.
//
// this code was copied from azblob.NewPipeline
func (f *Fs) newPipeline(c azblob.Credential, o azblob.PipelineOptions) pipeline.Pipeline {
// Closest to API goes first; closest to the wire goes last
factories := []pipeline.Factory{
azblob.NewTelemetryPolicyFactory(o.Telemetry),
azblob.NewUniqueRequestIDPolicyFactory(),
azblob.NewRetryPolicyFactory(o.Retry),
c,
pipeline.MethodFactoryMarker(), // indicates at what stage in the pipeline the method factory is invoked
azblob.NewRequestLogPolicyFactory(o.RequestLog),
}
return pipeline.NewPipeline(factories, pipeline.Options{HTTPSender: httpClientFactory(f.client), Log: o.Log})
}
// NewFs contstructs an Fs from the path, container:path
func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
// Parse config into Options struct
@ -307,6 +342,23 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
string(azblob.AccessTierHot), string(azblob.AccessTierCool), string(azblob.AccessTierArchive))
}
f := &Fs{
name: name,
opt: *opt,
container: container,
root: directory,
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
uploadToken: pacer.NewTokenDispenser(fs.Config.Transfers),
client: fshttp.NewClient(fs.Config),
}
f.features = (&fs.Features{
ReadMimeType: true,
WriteMimeType: true,
BucketBased: true,
SetTier: true,
GetTier: true,
}).Fill(f)
var (
u *url.URL
serviceURL azblob.ServiceURL
@ -323,7 +375,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
if err != nil {
return nil, errors.Wrap(err, "failed to make azure storage url from account and endpoint")
}
pipeline := azblob.NewPipeline(credential, azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: maxTryTimeout}})
pipeline := f.newPipeline(credential, azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: maxTryTimeout}})
serviceURL = azblob.NewServiceURL(*u, pipeline)
containerURL = serviceURL.NewContainerURL(container)
case opt.SASURL != "":
@ -332,7 +384,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
return nil, errors.Wrapf(err, "failed to parse SAS URL")
}
// use anonymous credentials in case of sas url
pipeline := azblob.NewPipeline(azblob.NewAnonymousCredential(), azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: maxTryTimeout}})
pipeline := f.newPipeline(azblob.NewAnonymousCredential(), azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: maxTryTimeout}})
// Check if we have container level SAS or account level sas
parts := azblob.NewBlobURLParts(*u)
if parts.ContainerName != "" {
@ -349,24 +401,9 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
default:
return nil, errors.New("Need account+key or connectionString or sasURL")
}
f.svcURL = &serviceURL
f.cntURL = &containerURL
f := &Fs{
name: name,
opt: *opt,
container: container,
root: directory,
svcURL: &serviceURL,
cntURL: &containerURL,
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
uploadToken: pacer.NewTokenDispenser(fs.Config.Transfers),
}
f.features = (&fs.Features{
ReadMimeType: true,
WriteMimeType: true,
BucketBased: true,
SetTier: true,
GetTier: true,
}).Fill(f)
if f.root != "" {
f.root += "/"
// Check to see if the (container,directory) is actually an existing file