backend/azureblob: Port new Azure Blob Storage SDK #2362

This change includes removing older azureblob storage SDK, and getting
parity to existing code with latest blob storage SDK.
This change is also pre-req for addressing #2091
This commit is contained in:
sandeepkru 2018-07-13 08:21:49 -07:00 committed by Nick Craig-Wood
parent 6efedc4043
commit 5ad8bcb43a
4 changed files with 309 additions and 242 deletions

View file

@ -1,9 +1,12 @@
// Package azureblob provides an interface to the Microsoft Azure blob object storage system
// +build !freebsd,!netbsd,!openbsd,!plan9,!solaris,go1.8
package azureblob
import (
"bytes"
"crypto/md5"
"context"
"encoding/base64"
"encoding/binary"
"encoding/hex"
@ -18,13 +21,12 @@ import (
"sync"
"time"
"github.com/Azure/azure-sdk-for-go/storage"
"github.com/Azure/azure-storage-blob-go/2018-03-28/azblob"
"github.com/ncw/rclone/fs"
"github.com/ncw/rclone/fs/accounting"
"github.com/ncw/rclone/fs/config"
"github.com/ncw/rclone/fs/config/flags"
"github.com/ncw/rclone/fs/fserrors"
"github.com/ncw/rclone/fs/fshttp"
"github.com/ncw/rclone/fs/hash"
"github.com/ncw/rclone/fs/walk"
"github.com/ncw/rclone/lib/pacer"
@ -32,15 +34,15 @@ import (
)
const (
apiVersion = "2017-04-17"
minSleep = 10 * time.Millisecond
maxSleep = 10 * time.Second
decayConstant = 1 // bigger for slower decay, exponential
listChunkSize = 5000 // number of items to read at once
modTimeKey = "mtime"
timeFormatIn = time.RFC3339
timeFormatOut = "2006-01-02T15:04:05.000000000Z07:00"
maxTotalParts = 50000 // in multipart upload
minSleep = 10 * time.Millisecond
maxSleep = 10 * time.Second
decayConstant = 1 // bigger for slower decay, exponential
listChunkSize = 5000 // number of items to read at once
modTimeKey = "mtime"
timeFormatIn = time.RFC3339
timeFormatOut = "2006-01-02T15:04:05.000000000Z07:00"
maxTotalParts = 50000 // in multipart upload
storageDefaultBaseURL = "blob.core.windows.net"
// maxUncommittedSize = 9 << 30 // can't upload bigger than this
)
@ -64,9 +66,6 @@ func init() {
}, {
Name: "key",
Help: "Storage Account Key (leave blank to use connection string or SAS URL)",
}, {
Name: "connection_string",
Help: "Connection string (leave blank if using account/key or SAS URL)",
}, {
Name: "sas_url",
Help: "SAS URL for container level access only\n(leave blank if using account/key or connection string)",
@ -82,13 +81,13 @@ func init() {
// Fs represents a remote azure server
type Fs struct {
name string // name of this remote
root string // the path we are working on if any
features *fs.Features // optional features
account string // account name
endpoint string // name of the starting api endpoint
bc *storage.BlobStorageClient
cc *storage.Container
name string // name of this remote
root string // the path we are working on if any
features *fs.Features // optional features
account string // account name
endpoint string // name of the starting api endpoint
svcURL *azblob.ServiceURL // reference to serviceURL
cntURL *azblob.ContainerURL // reference to containerURL
container string // the container we are working on
containerOKMu sync.Mutex // mutex to protect container OK
containerOK bool // true if we have created the container
@ -99,13 +98,14 @@ type Fs struct {
// Object describes a azure object
type Object struct {
fs *Fs // what this object is part of
remote string // The remote path
modTime time.Time // The modified time of the object if known
md5 string // MD5 hash if known
size int64 // Size of the object
mimeType string // Content-Type of the object
meta map[string]string // blob metadata
fs *Fs // what this object is part of
remote string // The remote path
modTime time.Time // The modified time of the object if known
md5 string // MD5 hash if known
size int64 // Size of the object
mimeType string // Content-Type of the object
accessTier azblob.AccessTierType // Blob Access Tier
meta map[string]string // blob metadata
}
// ------------------------------------------------------------
@ -165,8 +165,8 @@ var retryErrorCodes = []int{
// deserve to be retried. It returns the err as a convenience
func (f *Fs) shouldRetry(err error) (bool, error) {
// FIXME interpret special errors - more to do here
if storageErr, ok := err.(storage.AzureStorageServiceError); ok {
statusCode := storageErr.StatusCode
if storageErr, ok := err.(azblob.StorageError); ok {
statusCode := storageErr.Response().StatusCode
for _, e := range retryErrorCodes {
if statusCode == e {
return true, err
@ -190,44 +190,47 @@ func NewFs(name, root string) (fs.Fs, error) {
}
account := config.FileGet(name, "account")
key := config.FileGet(name, "key")
connectionString := config.FileGet(name, "connection_string")
sasURL := config.FileGet(name, "sas_url")
endpoint := config.FileGet(name, "endpoint", storage.DefaultBaseURL)
endpoint := config.FileGet(name, "endpoint", storageDefaultBaseURL)
var (
oclient storage.Client
client = &oclient
cc *storage.Container
u *url.URL
serviceURL azblob.ServiceURL
containerURL azblob.ContainerURL
)
switch {
case account != "" && key != "":
oclient, err = storage.NewClient(account, key, endpoint, apiVersion, true)
credential := azblob.NewSharedKeyCredential(account, key)
u, err = url.Parse(fmt.Sprintf("https://%s.%s", account, endpoint))
if err != nil {
return nil, errors.Wrap(err, "failed to make azure storage client from account/key")
}
case connectionString != "":
oclient, err = storage.NewClientFromConnectionString(connectionString)
if err != nil {
return nil, errors.Wrap(err, "failed to make azure storage client from connection string")
return nil, errors.Wrap(err, "failed to make azure storage url from account and endpoint")
}
pipeline := azblob.NewPipeline(credential, azblob.PipelineOptions{})
serviceURL = azblob.NewServiceURL(*u, pipeline)
containerURL = serviceURL.NewContainerURL(container)
case sasURL != "":
URL, err := url.Parse(sasURL)
u, err = url.Parse(sasURL)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse SAS URL")
}
cc, err = storage.GetContainerReferenceFromSASURI(*URL)
if err != nil {
return nil, errors.Wrapf(err, "failed to make azure storage client from SAS URL")
// use anonymous credentials in case of sas url
pipeline := azblob.NewPipeline(azblob.NewAnonymousCredential(), azblob.PipelineOptions{})
// Check if we have container level SAS or account level sas
parts := azblob.NewBlobURLParts(*u)
if parts.ContainerName != "" {
if container != "" && parts.ContainerName != container {
return nil, errors.New("Container name in SAS URL and container provided in command do not match")
}
container = parts.ContainerName
containerURL = azblob.NewContainerURL(*u, pipeline)
} else {
serviceURL = azblob.NewServiceURL(*u, pipeline)
containerURL = serviceURL.NewContainerURL(container)
}
client = cc.Client()
default:
return nil, errors.New("Need account+key or connectionString or sasURL")
}
client.HTTPClient = fshttp.NewClient(fs.Config)
bc := client.GetBlobService()
if cc == nil {
cc = bc.GetContainerReference(container)
}
f := &Fs{
name: name,
@ -235,8 +238,8 @@ func NewFs(name, root string) (fs.Fs, error) {
root: directory,
account: account,
endpoint: endpoint,
bc: &bc,
cc: cc,
svcURL: &serviceURL,
cntURL: &containerURL,
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
uploadToken: pacer.NewTokenDispenser(fs.Config.Transfers),
}
@ -274,13 +277,13 @@ func NewFs(name, root string) (fs.Fs, error) {
// Return an Object from a path
//
// If it can't be found it returns the error fs.ErrorObjectNotFound.
func (f *Fs) newObjectWithInfo(remote string, info *storage.Blob) (fs.Object, error) {
func (f *Fs) newObjectWithInfo(remote string, info *azblob.BlobItem) (fs.Object, error) {
o := &Object{
fs: f,
remote: remote,
}
if info != nil {
err := o.decodeMetaData(info)
err := o.decodeMetaDataFromBlob(info)
if err != nil {
return nil, err
}
@ -300,13 +303,12 @@ func (f *Fs) NewObject(remote string) (fs.Object, error) {
}
// getBlobReference creates an empty blob reference with no metadata
func (f *Fs) getBlobReference(remote string) *storage.Blob {
return f.cc.GetBlobReference(f.root + remote)
func (f *Fs) getBlobReference(remote string) azblob.BlobURL {
return f.cntURL.NewBlobURL(f.root + remote)
}
// getBlobWithModTime adds the modTime passed in to o.meta and creates
// a Blob from it.
func (o *Object) getBlobWithModTime(modTime time.Time) *storage.Blob {
// updateMetadataWithModTime adds the modTime passed in to o.meta.
func (o *Object) updateMetadataWithModTime(modTime time.Time) {
// Make sure o.meta is not nil
if o.meta == nil {
o.meta = make(map[string]string, 1)
@ -314,14 +316,10 @@ func (o *Object) getBlobWithModTime(modTime time.Time) *storage.Blob {
// Set modTimeKey in it
o.meta[modTimeKey] = modTime.Format(timeFormatOut)
blob := o.getBlobReference()
blob.Metadata = o.meta
return blob
}
// listFn is called from list to handle an object
type listFn func(remote string, object *storage.Blob, isDirectory bool) error
type listFn func(remote string, object *azblob.BlobItem, isDirectory bool) error
// list lists the objects into the function supplied from
// the container and root supplied
@ -342,32 +340,39 @@ func (f *Fs) list(dir string, recurse bool, maxResults uint, fn listFn) error {
if !recurse {
delimiter = "/"
}
params := storage.ListBlobsParameters{
MaxResults: maxResults,
Prefix: root,
Delimiter: delimiter,
Include: &storage.IncludeBlobDataset{
Snapshots: false,
Metadata: true,
UncommittedBlobs: false,
options := azblob.ListBlobsSegmentOptions{
Details: azblob.BlobListingDetails{
Copy: false,
Metadata: true,
Snapshots: false,
UncommittedBlobs: false,
Deleted: false,
},
Prefix: root,
MaxResults: int32(maxResults),
}
for {
var response storage.BlobListResponse
ctx := context.Background()
for marker := (azblob.Marker{}); marker.NotDone(); {
var response *azblob.ListBlobsHierarchySegmentResponse
err := f.pacer.Call(func() (bool, error) {
var err error
response, err = f.cc.ListBlobs(params)
response, err = f.cntURL.ListBlobsHierarchySegment(ctx, marker, delimiter, options)
return f.shouldRetry(err)
})
if err != nil {
if storageErr, ok := err.(storage.AzureStorageServiceError); ok && storageErr.StatusCode == http.StatusNotFound {
// Check http error code along with service code, current SDK doesn't populate service code correctly sometimes
if storageErr, ok := err.(azblob.StorageError); ok && storageErr.ServiceCode() == azblob.ServiceCodeContainerNotFound || storageErr.Response().StatusCode == http.StatusNotFound {
return fs.ErrorDirNotFound
}
return err
}
for i := range response.Blobs {
file := &response.Blobs[i]
// Advance marker to next
marker = response.NextMarker
for i := range response.Segment.BlobItems {
file := &response.Segment.BlobItems[i]
// Finish if file name no longer has prefix
// if prefix != "" && !strings.HasPrefix(file.Name, prefix) {
// return nil
@ -389,8 +394,8 @@ func (f *Fs) list(dir string, recurse bool, maxResults uint, fn listFn) error {
}
}
// Send the subdirectories
for _, remote := range response.BlobPrefixes {
remote := strings.TrimRight(remote, "/")
for _, remote := range response.Segment.BlobPrefixes {
remote := strings.TrimRight(remote.Name, "/")
if !strings.HasPrefix(remote, f.root) {
fs.Debugf(f, "Odd directory name received %q", remote)
continue
@ -402,17 +407,12 @@ func (f *Fs) list(dir string, recurse bool, maxResults uint, fn listFn) error {
return err
}
}
// end if no NextFileName
if response.NextMarker == "" {
break
}
params.Marker = response.NextMarker
}
return nil
}
// Convert a list item into a DirEntry
func (f *Fs) itemToDirEntry(remote string, object *storage.Blob, isDirectory bool) (fs.DirEntry, error) {
func (f *Fs) itemToDirEntry(remote string, object *azblob.BlobItem, isDirectory bool) (fs.DirEntry, error) {
if isDirectory {
d := fs.NewDir(remote, time.Time{})
return d, nil
@ -436,7 +436,7 @@ func (f *Fs) markContainerOK() {
// listDir lists a single directory
func (f *Fs) listDir(dir string) (entries fs.DirEntries, err error) {
err = f.list(dir, false, listChunkSize, func(remote string, object *storage.Blob, isDirectory bool) error {
err = f.list(dir, false, listChunkSize, func(remote string, object *azblob.BlobItem, isDirectory bool) error {
entry, err := f.itemToDirEntry(remote, object, isDirectory)
if err != nil {
return err
@ -459,13 +459,8 @@ func (f *Fs) listContainers(dir string) (entries fs.DirEntries, err error) {
if dir != "" {
return nil, fs.ErrorListBucketRequired
}
err = f.listContainersToFn(func(container *storage.Container) error {
t, err := time.Parse(time.RFC1123, container.Properties.LastModified)
if err != nil {
fs.Debugf(f, "Failed to parse LastModified %q: %v", container.Properties.LastModified, err)
t = time.Time{}
}
d := fs.NewDir(container.Name, t)
err = f.listContainersToFn(func(container *azblob.ContainerItem) error {
d := fs.NewDir(container.Name, container.Properties.LastModified)
entries = append(entries, d)
return nil
})
@ -512,7 +507,7 @@ func (f *Fs) ListR(dir string, callback fs.ListRCallback) (err error) {
return fs.ErrorListBucketRequired
}
list := walk.NewListRHelper(callback)
err = f.list(dir, true, listChunkSize, func(remote string, object *storage.Blob, isDirectory bool) error {
err = f.list(dir, true, listChunkSize, func(remote string, object *azblob.BlobItem, isDirectory bool) error {
entry, err := f.itemToDirEntry(remote, object, isDirectory)
if err != nil {
return err
@ -528,27 +523,34 @@ func (f *Fs) ListR(dir string, callback fs.ListRCallback) (err error) {
}
// listContainerFn is called from listContainersToFn to handle a container
type listContainerFn func(*storage.Container) error
type listContainerFn func(*azblob.ContainerItem) error
// listContainersToFn lists the containers to the function supplied
func (f *Fs) listContainersToFn(fn listContainerFn) error {
// FIXME page the containers if necessary?
params := storage.ListContainersParameters{}
var response *storage.ContainerListResponse
err := f.pacer.Call(func() (bool, error) {
var err error
response, err = f.bc.ListContainers(params)
return f.shouldRetry(err)
})
if err != nil {
return err
params := azblob.ListContainersSegmentOptions{
MaxResults: int32(listChunkSize),
}
for i := range response.Containers {
err = fn(&response.Containers[i])
ctx := context.Background()
for marker := (azblob.Marker{}); marker.NotDone(); {
var response *azblob.ListContainersResponse
err := f.pacer.Call(func() (bool, error) {
var err error
response, err = f.svcURL.ListContainersSegment(ctx, marker, params)
return f.shouldRetry(err)
})
if err != nil {
return err
}
for i := range response.ContainerItems {
err = fn(&response.ContainerItems[i])
if err != nil {
return err
}
}
marker = response.NextMarker
}
return nil
}
@ -573,32 +575,20 @@ func (f *Fs) Mkdir(dir string) error {
if f.containerOK {
return nil
}
// List the container to see if it exists
err := f.list("", false, 1, func(remote string, object *storage.Blob, isDirectory bool) error {
return nil
})
if err == nil {
f.markContainerOK()
return nil
}
// now try to create the container
options := storage.CreateContainerOptions{
Access: storage.ContainerAccessTypePrivate,
}
err = f.pacer.Call(func() (bool, error) {
err := f.cc.Create(&options)
err := f.pacer.Call(func() (bool, error) {
ctx := context.Background()
_, err := f.cntURL.Create(ctx, azblob.Metadata{}, azblob.PublicAccessNone)
if err != nil {
if storageErr, ok := err.(storage.AzureStorageServiceError); ok {
switch storageErr.StatusCode {
case http.StatusConflict:
switch storageErr.Code {
case "ContainerAlreadyExists":
f.containerOK = true
return false, nil
case "ContainerBeingDeleted":
f.containerDeleted = true
return true, err
}
if storageErr, ok := err.(azblob.StorageError); ok {
switch storageErr.ServiceCode() {
case azblob.ServiceCodeContainerAlreadyExists:
f.containerOK = true
return false, nil
case azblob.ServiceCodeContainerBeingDeleted:
f.containerDeleted = true
return true, err
}
}
}
@ -614,7 +604,7 @@ func (f *Fs) Mkdir(dir string) error {
// isEmpty checks to see if a given directory is empty and returns an error if not
func (f *Fs) isEmpty(dir string) (err error) {
empty := true
err = f.list("", true, 1, func(remote string, object *storage.Blob, isDirectory bool) error {
err = f.list("", true, 1, func(remote string, object *azblob.BlobItem, isDirectory bool) error {
empty = false
return nil
})
@ -632,16 +622,23 @@ func (f *Fs) isEmpty(dir string) (err error) {
func (f *Fs) deleteContainer() error {
f.containerOKMu.Lock()
defer f.containerOKMu.Unlock()
options := storage.DeleteContainerOptions{}
options := azblob.ContainerAccessConditions{}
ctx := context.Background()
err := f.pacer.Call(func() (bool, error) {
exists, err := f.cc.Exists()
_, err := f.cntURL.GetProperties(ctx, azblob.LeaseAccessConditions{})
if err == nil {
_, err = f.cntURL.Delete(ctx, options)
}
if err != nil {
// Check http error code along with service code, current SDK doesn't populate service code correctly sometimes
if storageErr, ok := err.(azblob.StorageError); ok && storageErr.ServiceCode() == azblob.ServiceCodeContainerNotFound || storageErr.Response().StatusCode == http.StatusNotFound {
return false, fs.ErrorDirNotFound
}
return f.shouldRetry(err)
}
if !exists {
return false, fs.ErrorDirNotFound
}
err = f.cc.Delete(&options)
return f.shouldRetry(err)
})
if err == nil {
@ -704,17 +701,36 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) {
fs.Debugf(src, "Can't copy - not same remote type")
return nil, fs.ErrorCantCopy
}
dstBlob := f.getBlobReference(remote)
srcBlob := srcObj.getBlobReference()
options := storage.CopyOptions{}
sourceBlobURL := srcBlob.GetURL()
dstBlobURL := f.getBlobReference(remote)
srcBlobURL := srcObj.getBlobReference()
source, err := url.Parse(srcBlobURL.String())
if err != nil {
return nil, err
}
options := azblob.BlobAccessConditions{}
ctx := context.Background()
var startCopy *azblob.BlobStartCopyFromURLResponse
err = f.pacer.Call(func() (bool, error) {
err = dstBlob.Copy(sourceBlobURL, &options)
startCopy, err = dstBlobURL.StartCopyFromURL(ctx, *source, nil, options, options)
return f.shouldRetry(err)
})
if err != nil {
return nil, err
}
copyStatus := startCopy.CopyStatus()
for copyStatus == azblob.CopyStatusPending {
time.Sleep(1 * time.Second)
getMetadata, err := dstBlobURL.GetProperties(ctx, options)
if err != nil {
return nil, err
}
copyStatus = getMetadata.CopyStatus()
}
return f.NewObject(remote)
}
@ -759,22 +775,10 @@ func (o *Object) Size() int64 {
return o.size
}
// decodeMetaData sets the metadata from the data passed in
//
// Sets
// o.id
// o.modTime
// o.size
// o.md5
// o.meta
func (o *Object) decodeMetaData(info *storage.Blob) (err error) {
o.md5 = info.Properties.ContentMD5
o.mimeType = info.Properties.ContentType
o.size = info.Properties.ContentLength
o.modTime = time.Time(info.Properties.LastModified)
if len(info.Metadata) > 0 {
o.meta = info.Metadata
if modTime, ok := info.Metadata[modTimeKey]; ok {
func (o *Object) setMetadata(metadata azblob.Metadata) {
if len(metadata) > 0 {
o.meta = metadata
if modTime, ok := metadata[modTimeKey]; ok {
when, err := time.Parse(timeFormatIn, modTime)
if err != nil {
fs.Debugf(o, "Couldn't parse %v = %q: %v", modTimeKey, modTime, err)
@ -784,11 +788,42 @@ func (o *Object) decodeMetaData(info *storage.Blob) (err error) {
} else {
o.meta = nil
}
}
// decodeMetaDataFromPropertiesResponse sets the metadata from the data passed in
//
// Sets
// o.id
// o.modTime
// o.size
// o.md5
// o.meta
func (o *Object) decodeMetaDataFromPropertiesResponse(info *azblob.BlobGetPropertiesResponse) (err error) {
// NOTE - In BlobGetPropertiesResponse, Client library returns MD5 as base64 decoded string
// unlike BlobProperties in BlobItem (used in decodeMetadataFromBlob) which returns base64
// encoded bytes. Object needs to maintain this as base64 encoded string.
o.md5 = base64.StdEncoding.EncodeToString(info.ContentMD5())
o.mimeType = info.ContentType()
o.size = info.ContentLength()
o.modTime = time.Time(info.LastModified())
o.accessTier = azblob.AccessTierType(info.AccessTier())
o.setMetadata(info.NewMetadata())
return nil
}
func (o *Object) decodeMetaDataFromBlob(info *azblob.BlobItem) (err error) {
o.md5 = string(info.Properties.ContentMD5[:])
o.mimeType = *info.Properties.ContentType
o.size = *info.Properties.ContentLength
o.modTime = info.Properties.LastModified
o.accessTier = info.Properties.AccessTier
o.setMetadata(info.Metadata)
return nil
}
// getBlobReference creates an empty blob reference with no metadata
func (o *Object) getBlobReference() *storage.Blob {
func (o *Object) getBlobReference() azblob.BlobURL {
return o.fs.getBlobReference(o.remote)
}
@ -811,19 +846,22 @@ func (o *Object) readMetaData() (err error) {
blob := o.getBlobReference()
// Read metadata (this includes metadata)
getPropertiesOptions := storage.GetBlobPropertiesOptions{}
options := azblob.BlobAccessConditions{}
ctx := context.Background()
var blobProperties *azblob.BlobGetPropertiesResponse
err = o.fs.pacer.Call(func() (bool, error) {
err = blob.GetProperties(&getPropertiesOptions)
blobProperties, err = blob.GetProperties(ctx, options)
return o.fs.shouldRetry(err)
})
if err != nil {
if storageErr, ok := err.(storage.AzureStorageServiceError); ok && storageErr.StatusCode == http.StatusNotFound {
// On directories - GetProperties does not work and current SDK does not populate service code correctly hence check regular http response as well
if storageErr, ok := err.(azblob.StorageError); ok && storageErr.ServiceCode() == azblob.ServiceCodeBlobNotFound || storageErr.Response().StatusCode == http.StatusNotFound {
return fs.ErrorObjectNotFound
}
return err
}
return o.decodeMetaData(blob)
return o.decodeMetaDataFromPropertiesResponse(blobProperties)
}
// timeString returns modTime as the number of milliseconds
@ -860,10 +898,17 @@ func (o *Object) ModTime() (result time.Time) {
// SetModTime sets the modification time of the local fs object
func (o *Object) SetModTime(modTime time.Time) error {
blob := o.getBlobWithModTime(modTime)
options := storage.SetBlobMetadataOptions{}
// Make sure o.meta is not nil
if o.meta == nil {
o.meta = make(map[string]string, 1)
}
// Set modTimeKey in it
o.meta[modTimeKey] = modTime.Format(timeFormatOut)
blob := o.getBlobReference()
ctx := context.Background()
err := o.fs.pacer.Call(func() (bool, error) {
err := blob.SetMetadata(&options)
_, err := blob.SetMetadata(ctx, o.meta, azblob.BlobAccessConditions{})
return o.fs.shouldRetry(err)
})
if err != nil {
@ -880,29 +925,18 @@ func (o *Object) Storable() bool {
// Open an object for read
func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
getBlobOptions := storage.GetBlobOptions{}
getBlobRangeOptions := storage.GetBlobRangeOptions{
GetBlobOptions: &getBlobOptions,
}
// Offset and Count for range download
var offset int64
var count int64
for _, option := range options {
switch x := option.(type) {
case *fs.RangeOption:
start, end := x.Start, x.End
if end < 0 {
end = 0
}
if start < 0 {
start = o.size - end
end = 0
}
getBlobRangeOptions.Range = &storage.BlobRange{
Start: uint64(start),
End: uint64(end),
offset, count = x.Decode(o.size)
if count < 0 {
count = o.size - offset
}
case *fs.SeekOption:
getBlobRangeOptions.Range = &storage.BlobRange{
Start: uint64(x.Offset),
}
offset = x.Offset
default:
if option.Mandatory() {
fs.Logf(o, "Unsupported mandatory option: %v", option)
@ -910,17 +944,17 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
}
}
blob := o.getBlobReference()
ctx := context.Background()
ac := azblob.BlobAccessConditions{}
var dowloadResponse *azblob.DownloadResponse
err = o.fs.pacer.Call(func() (bool, error) {
if getBlobRangeOptions.Range == nil {
in, err = blob.Get(&getBlobOptions)
} else {
in, err = blob.GetRange(&getBlobRangeOptions)
}
dowloadResponse, err = blob.Download(ctx, offset, count, ac, false)
return o.fs.shouldRetry(err)
})
if err != nil {
return nil, errors.Wrap(err, "failed to open for download")
}
in = dowloadResponse.Body(azblob.RetryReaderOptions{})
return in, nil
}
@ -945,10 +979,16 @@ func init() {
}
}
// readSeeker joins an io.Reader and an io.Seeker
type readSeeker struct {
io.Reader
io.Seeker
}
// uploadMultipart uploads a file using multipart upload
//
// Write a larger blob, using CreateBlockBlob, PutBlock, and PutBlockList.
func (o *Object) uploadMultipart(in io.Reader, size int64, blob *storage.Blob, putBlobOptions *storage.PutBlobOptions) (err error) {
func (o *Object) uploadMultipart(in io.Reader, size int64, blob *azblob.BlobURL, httpHeaders *azblob.BlobHTTPHeaders) (err error) {
// Calculate correct chunkSize
chunkSize := int64(chunkSize)
var totalParts int64
@ -970,31 +1010,37 @@ func (o *Object) uploadMultipart(in io.Reader, size int64, blob *storage.Blob, p
}
fs.Debugf(o, "Multipart upload session started for %d parts of size %v", totalParts, fs.SizeSuffix(chunkSize))
// Create an empty blob
err = o.fs.pacer.Call(func() (bool, error) {
err := blob.CreateBlockBlob(putBlobOptions)
return o.fs.shouldRetry(err)
})
// https://godoc.org/github.com/Azure/azure-storage-blob-go/2017-07-29/azblob#example-BlockBlobURL
// Utilities are cloned from above example
// These helper functions convert a binary block ID to a base-64 string and vice versa
// NOTE: The blockID must be <= 64 bytes and ALL blockIDs for the block must be the same length
blockIDBinaryToBase64 := func(blockID []byte) string { return base64.StdEncoding.EncodeToString(blockID) }
// These helper functions convert an int block ID to a base-64 string and vice versa
blockIDIntToBase64 := func(blockID uint64) string {
binaryBlockID := (&[8]byte{})[:] // All block IDs are 8 bytes long
binary.LittleEndian.PutUint64(binaryBlockID, blockID)
return blockIDBinaryToBase64(binaryBlockID)
}
// block ID variables
var (
rawID uint64
bytesID = make([]byte, 8)
blockID = "" // id in base64 encoded form
blocks = make([]storage.Block, 0, totalParts)
blocks = make([]string, totalParts)
)
// increment the blockID
nextID := func() {
rawID++
binary.LittleEndian.PutUint64(bytesID, rawID)
blockID = base64.StdEncoding.EncodeToString(bytesID)
blocks = append(blocks, storage.Block{
ID: blockID,
Status: storage.BlockStatusLatest,
})
blockID = blockIDIntToBase64(rawID)
blocks = append(blocks, blockID)
}
// Get BlockBlobURL, we will use default pipeline here
blockBlobURL := blob.ToBlockBlobURL()
ctx := context.Background()
ac := azblob.LeaseAccessConditions{} // Use default lease access conditions
// unwrap the accounting from the input, we use wrap to put it
// back on after the buffering
in, wrap := accounting.UnWrap(in)
@ -1037,13 +1083,11 @@ outer:
defer o.fs.uploadToken.Put()
fs.Debugf(o, "Uploading part %d/%d offset %v/%v part size %v", part+1, totalParts, fs.SizeSuffix(position), fs.SizeSuffix(size), fs.SizeSuffix(chunkSize))
// Upload the block, with MD5 for check
md5sum := md5.Sum(buf)
putBlockOptions := storage.PutBlockOptions{
ContentMD5: base64.StdEncoding.EncodeToString(md5sum[:]),
}
err = o.fs.pacer.Call(func() (bool, error) {
err = blob.PutBlockWithLength(blockID, uint64(len(buf)), wrap(bytes.NewBuffer(buf)), &putBlockOptions)
bufferReader := bytes.NewReader(buf)
wrappedReader := wrap(bufferReader)
rs := readSeeker{wrappedReader, bufferReader}
_, err = blockBlobURL.StageBlock(ctx, blockID, rs, ac)
return o.fs.shouldRetry(err)
})
@ -1073,9 +1117,8 @@ outer:
}
// Finalise the upload session
putBlockListOptions := storage.PutBlockListOptions{}
err = o.fs.pacer.Call(func() (bool, error) {
err := blob.PutBlockList(blocks, &putBlockListOptions)
_, err := blockBlobURL.CommitBlockList(ctx, blocks, *httpHeaders, o.meta, azblob.BlobAccessConditions{})
return o.fs.shouldRetry(err)
})
if err != nil {
@ -1093,29 +1136,45 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
return err
}
size := src.Size()
blob := o.getBlobWithModTime(src.ModTime())
blob.Properties.ContentType = fs.MimeType(o)
if sourceMD5, _ := src.Hash(hash.MD5); sourceMD5 != "" {
sourceMD5bytes, err := hex.DecodeString(sourceMD5)
if err == nil {
blob.Properties.ContentMD5 = base64.StdEncoding.EncodeToString(sourceMD5bytes)
} else {
fs.Debugf(o, "Failed to decode %q as MD5: %v", sourceMD5, err)
// Update Mod time
o.updateMetadataWithModTime(src.ModTime())
if err != nil {
return err
}
blob := o.getBlobReference()
httpHeaders := azblob.BlobHTTPHeaders{}
httpHeaders.ContentType = fs.MimeType(o)
// Multipart upload doesn't support MD5 checksums at put block calls, hence calculate
// MD5 only for PutBlob requests
if size < int64(uploadCutoff) {
if sourceMD5, _ := src.Hash(hash.MD5); sourceMD5 != "" {
sourceMD5bytes, err := hex.DecodeString(sourceMD5)
if err == nil {
httpHeaders.ContentMD5 = sourceMD5bytes
} else {
fs.Debugf(o, "Failed to decode %q as MD5: %v", sourceMD5, err)
}
}
}
putBlobOptions := storage.PutBlobOptions{}
putBlobOptions := azblob.UploadStreamToBlockBlobOptions{
BufferSize: int(chunkSize),
MaxBuffers: 4,
Metadata: o.meta,
BlobHTTPHeaders: httpHeaders,
}
ctx := context.Background()
// Don't retry, return a retry error instead
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
if size >= int64(uploadCutoff) {
// If a large file upload in chunks
err = o.uploadMultipart(in, size, blob, &putBlobOptions)
err = o.uploadMultipart(in, size, &blob, &httpHeaders)
} else {
// Write a small blob in one transaction
if size == 0 {
in = nil
}
err = blob.CreateBlockBlobFromReader(in, &putBlobOptions)
blockBlobURL := blob.ToBlockBlobURL()
_, err = azblob.UploadStreamToBlockBlob(ctx, in, blockBlobURL, putBlobOptions)
}
return o.fs.shouldRetry(err)
})
@ -1129,9 +1188,11 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
// Remove an object
func (o *Object) Remove() error {
blob := o.getBlobReference()
options := storage.DeleteBlobOptions{}
snapShotOptions := azblob.DeleteSnapshotsOptionNone
ac := azblob.BlobAccessConditions{}
ctx := context.Background()
return o.fs.pacer.Call(func() (bool, error) {
err := blob.Delete(&options)
_, err := blob.Delete(ctx, snapShotOptions, ac)
return o.fs.shouldRetry(err)
})
}