forked from TrueCloudLab/rclone
azureblob: use the rclone HTTP client - fixes #2654
This enables --dump headers and --timeout to work properly.
This commit is contained in:
parent
35fba5bfdd
commit
5d1d93e163
1 changed files with 56 additions and 19 deletions
|
@ -22,12 +22,14 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-pipeline-go/pipeline"
|
||||
"github.com/Azure/azure-storage-blob-go/azblob"
|
||||
"github.com/ncw/rclone/fs"
|
||||
"github.com/ncw/rclone/fs/accounting"
|
||||
"github.com/ncw/rclone/fs/config/configmap"
|
||||
"github.com/ncw/rclone/fs/config/configstruct"
|
||||
"github.com/ncw/rclone/fs/fserrors"
|
||||
"github.com/ncw/rclone/fs/fshttp"
|
||||
"github.com/ncw/rclone/fs/hash"
|
||||
"github.com/ncw/rclone/fs/walk"
|
||||
"github.com/ncw/rclone/lib/pacer"
|
||||
|
@ -135,6 +137,7 @@ type Fs struct {
|
|||
root string // the path we are working on if any
|
||||
opt Options // parsed config options
|
||||
features *fs.Features // optional features
|
||||
client *http.Client // http client we are using
|
||||
svcURL *azblob.ServiceURL // reference to serviceURL
|
||||
cntURL *azblob.ContainerURL // reference to containerURL
|
||||
container string // the container we are working on
|
||||
|
@ -272,6 +275,38 @@ func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
// httpClientFactory creates a Factory object that sends HTTP requests
|
||||
// to a rclone's http.Client.
|
||||
//
|
||||
// copied from azblob.newDefaultHTTPClientFactory
|
||||
func httpClientFactory(client *http.Client) pipeline.Factory {
|
||||
return pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
|
||||
return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
|
||||
r, err := client.Do(request.WithContext(ctx))
|
||||
if err != nil {
|
||||
err = pipeline.NewError(err, "HTTP request failed")
|
||||
}
|
||||
return pipeline.NewHTTPResponse(r), err
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// newPipeline creates a Pipeline using the specified credentials and options.
|
||||
//
|
||||
// this code was copied from azblob.NewPipeline
|
||||
func (f *Fs) newPipeline(c azblob.Credential, o azblob.PipelineOptions) pipeline.Pipeline {
|
||||
// Closest to API goes first; closest to the wire goes last
|
||||
factories := []pipeline.Factory{
|
||||
azblob.NewTelemetryPolicyFactory(o.Telemetry),
|
||||
azblob.NewUniqueRequestIDPolicyFactory(),
|
||||
azblob.NewRetryPolicyFactory(o.Retry),
|
||||
c,
|
||||
pipeline.MethodFactoryMarker(), // indicates at what stage in the pipeline the method factory is invoked
|
||||
azblob.NewRequestLogPolicyFactory(o.RequestLog),
|
||||
}
|
||||
return pipeline.NewPipeline(factories, pipeline.Options{HTTPSender: httpClientFactory(f.client), Log: o.Log})
|
||||
}
|
||||
|
||||
// NewFs contstructs an Fs from the path, container:path
|
||||
func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
||||
// Parse config into Options struct
|
||||
|
@ -307,6 +342,23 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
|||
string(azblob.AccessTierHot), string(azblob.AccessTierCool), string(azblob.AccessTierArchive))
|
||||
}
|
||||
|
||||
f := &Fs{
|
||||
name: name,
|
||||
opt: *opt,
|
||||
container: container,
|
||||
root: directory,
|
||||
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
|
||||
uploadToken: pacer.NewTokenDispenser(fs.Config.Transfers),
|
||||
client: fshttp.NewClient(fs.Config),
|
||||
}
|
||||
f.features = (&fs.Features{
|
||||
ReadMimeType: true,
|
||||
WriteMimeType: true,
|
||||
BucketBased: true,
|
||||
SetTier: true,
|
||||
GetTier: true,
|
||||
}).Fill(f)
|
||||
|
||||
var (
|
||||
u *url.URL
|
||||
serviceURL azblob.ServiceURL
|
||||
|
@ -323,7 +375,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
|||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to make azure storage url from account and endpoint")
|
||||
}
|
||||
pipeline := azblob.NewPipeline(credential, azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: maxTryTimeout}})
|
||||
pipeline := f.newPipeline(credential, azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: maxTryTimeout}})
|
||||
serviceURL = azblob.NewServiceURL(*u, pipeline)
|
||||
containerURL = serviceURL.NewContainerURL(container)
|
||||
case opt.SASURL != "":
|
||||
|
@ -332,7 +384,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
|||
return nil, errors.Wrapf(err, "failed to parse SAS URL")
|
||||
}
|
||||
// use anonymous credentials in case of sas url
|
||||
pipeline := azblob.NewPipeline(azblob.NewAnonymousCredential(), azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: maxTryTimeout}})
|
||||
pipeline := f.newPipeline(azblob.NewAnonymousCredential(), azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: maxTryTimeout}})
|
||||
// Check if we have container level SAS or account level sas
|
||||
parts := azblob.NewBlobURLParts(*u)
|
||||
if parts.ContainerName != "" {
|
||||
|
@ -349,24 +401,9 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
|||
default:
|
||||
return nil, errors.New("Need account+key or connectionString or sasURL")
|
||||
}
|
||||
f.svcURL = &serviceURL
|
||||
f.cntURL = &containerURL
|
||||
|
||||
f := &Fs{
|
||||
name: name,
|
||||
opt: *opt,
|
||||
container: container,
|
||||
root: directory,
|
||||
svcURL: &serviceURL,
|
||||
cntURL: &containerURL,
|
||||
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
|
||||
uploadToken: pacer.NewTokenDispenser(fs.Config.Transfers),
|
||||
}
|
||||
f.features = (&fs.Features{
|
||||
ReadMimeType: true,
|
||||
WriteMimeType: true,
|
||||
BucketBased: true,
|
||||
SetTier: true,
|
||||
GetTier: true,
|
||||
}).Fill(f)
|
||||
if f.root != "" {
|
||||
f.root += "/"
|
||||
// Check to see if the (container,directory) is actually an existing file
|
||||
|
|
Loading…
Reference in a new issue