From ec73d2fb9a72804e45c57b74717a9bd60ebfdc88 Mon Sep 17 00:00:00 2001
From: Denis Neuling <denisneuling@gmail.com>
Date: Tue, 10 Sep 2019 17:35:25 +0200
Subject: [PATCH] azure-blob-storage: utilize streaming capabilities - #1614

---
 backend/azureblob/azureblob.go      | 210 ++--------------------------
 backend/azureblob/azureblob_test.go |   5 -
 docs/content/azureblob.md           |  30 ----
 3 files changed, 11 insertions(+), 234 deletions(-)

diff --git a/backend/azureblob/azureblob.go b/backend/azureblob/azureblob.go
index ff055b029..b18a2460b 100644
--- a/backend/azureblob/azureblob.go
+++ b/backend/azureblob/azureblob.go
@@ -5,9 +5,7 @@
 package azureblob
 
 import (
-	"bytes"
 	"context"
-	"crypto/md5"
 	"encoding/base64"
 	"encoding/hex"
 	"encoding/json"
@@ -26,7 +24,6 @@ import (
 	"github.com/Azure/go-autorest/autorest/adal"
 	"github.com/pkg/errors"
 	"github.com/rclone/rclone/fs"
-	"github.com/rclone/rclone/fs/accounting"
 	"github.com/rclone/rclone/fs/config"
 	"github.com/rclone/rclone/fs/config/configmap"
 	"github.com/rclone/rclone/fs/config/configstruct"
@@ -39,8 +36,6 @@ import (
 	"github.com/rclone/rclone/lib/env"
 	"github.com/rclone/rclone/lib/pacer"
 	"github.com/rclone/rclone/lib/pool"
-	"github.com/rclone/rclone/lib/readers"
-	"golang.org/x/sync/errgroup"
 )
 
 const (
@@ -51,15 +46,11 @@ const (
 	modTimeKey            = "mtime"
 	timeFormatIn          = time.RFC3339
 	timeFormatOut         = "2006-01-02T15:04:05.000000000Z07:00"
-	maxTotalParts         = 50000 // in multipart upload
 	storageDefaultBaseURL = "blob.core.windows.net"
-	// maxUncommittedSize = 9 << 30 // can't upload bigger than this
-	defaultChunkSize    = 4 * fs.MebiByte
-	maxChunkSize        = 100 * fs.MebiByte
-	defaultUploadCutoff = 256 * fs.MebiByte
-	maxUploadCutoff     = 256 * fs.MebiByte
-	defaultAccessTier   = azblob.AccessTierNone
-	maxTryTimeout       = time.Hour * 24 * 365 //max time of an azure web request response window (whether or not data is flowing)
+	defaultChunkSize      = 4 * fs.MebiByte
+	maxChunkSize          = 100 * fs.MebiByte
+	defaultAccessTier     = azblob.AccessTierNone
+	maxTryTimeout         = time.Hour * 24 * 365 //max time of an azure web request response window (whether or not data is flowing)
 	// Default storage account, key and blob endpoint for emulator support,
 	// though it is a base64 key checked in here, it is publicly available secret.
 	emulatorAccount      = "devstoreaccount1"
@@ -137,8 +128,7 @@ msi_client_id, or msi_mi_res_id parameters.`,
 			Advanced: true,
 		}, {
 			Name:     "upload_cutoff",
-			Help:     "Cutoff for switching to chunked upload (<= 256MB).",
-			Default:  defaultUploadCutoff,
+			Help:     "Cutoff for switching to chunked upload (<= 256MB). (Deprecated)",
 			Advanced: true,
 		}, {
 			Name: "chunk_size",
@@ -241,7 +231,6 @@ type Options struct {
 	MSIResourceID        string               `config:"msi_mi_res_id"`
 	Endpoint             string               `config:"endpoint"`
 	SASURL               string               `config:"sas_url"`
-	UploadCutoff         fs.SizeSuffix        `config:"upload_cutoff"`
 	ChunkSize            fs.SizeSuffix        `config:"chunk_size"`
 	ListChunkSize        uint                 `config:"list_chunk"`
 	AccessTier           string               `config:"access_tier"`
@@ -397,21 +386,6 @@ func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error)
 	return
 }
 
-func checkUploadCutoff(cs fs.SizeSuffix) error {
-	if cs > maxUploadCutoff {
-		return errors.Errorf("%v must be less than or equal to %v", cs, maxUploadCutoff)
-	}
-	return nil
-}
-
-func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) {
-	err = checkUploadCutoff(cs)
-	if err == nil {
-		old, f.opt.UploadCutoff = f.opt.UploadCutoff, cs
-	}
-	return
-}
-
 // httpClientFactory creates a Factory object that sends HTTP requests
 // to an rclone's http.Client.
 //
@@ -506,10 +480,6 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
 		return nil, err
 	}
 
-	err = checkUploadCutoff(opt.UploadCutoff)
-	if err != nil {
-		return nil, errors.Wrap(err, "azure: upload cutoff")
-	}
 	err = checkUploadChunkSize(opt.ChunkSize)
 	if err != nil {
 		return nil, errors.Wrap(err, "azure: chunk size")
@@ -1510,12 +1480,6 @@ func init() {
 	}
 }
 
-// readSeeker joins an io.Reader and an io.Seeker
-type readSeeker struct {
-	io.Reader
-	io.Seeker
-}
-
 // increment the slice passed in as LSB binary
 func increment(xs []byte) {
 	for i, digit := range xs {
@@ -1528,143 +1492,6 @@ func increment(xs []byte) {
 	}
 }
 
-var warnStreamUpload sync.Once
-
-// uploadMultipart uploads a file using multipart upload
-//
-// Write a larger blob, using CreateBlockBlob, PutBlock, and PutBlockList.
-func (o *Object) uploadMultipart(ctx context.Context, in io.Reader, size int64, blob *azblob.BlobURL, httpHeaders *azblob.BlobHTTPHeaders) (err error) {
-	// Calculate correct chunkSize
-	chunkSize := int64(o.fs.opt.ChunkSize)
-	totalParts := -1
-
-	// Note that the max size of file is 4.75 TB (100 MB X 50,000
-	// blocks) and this is bigger than the max uncommitted block
-	// size (9.52 TB) so we do not need to part commit block lists
-	// or garbage collect uncommitted blocks.
-	//
-	// See: https://docs.microsoft.com/en-gb/rest/api/storageservices/put-block
-
-	// size can be -1 here meaning we don't know the size of the incoming file.  We use ChunkSize
-	// buffers here (default 4MB). With a maximum number of parts (50,000) this will be a file of
-	// 195GB which seems like a not too unreasonable limit.
-	if size == -1 {
-		warnStreamUpload.Do(func() {
-			fs.Logf(o, "Streaming uploads using chunk size %v will have maximum file size of %v",
-				o.fs.opt.ChunkSize, fs.SizeSuffix(chunkSize*maxTotalParts))
-		})
-	} else {
-		// Adjust partSize until the number of parts is small enough.
-		if size/chunkSize >= maxTotalParts {
-			// Calculate partition size rounded up to the nearest MB
-			chunkSize = (((size / maxTotalParts) >> 20) + 1) << 20
-		}
-		if chunkSize > int64(maxChunkSize) {
-			return errors.Errorf("can't upload as it is too big %v - takes more than %d chunks of %v", fs.SizeSuffix(size), totalParts, fs.SizeSuffix(chunkSize/2))
-		}
-		totalParts = int(size / chunkSize)
-		if size%chunkSize != 0 {
-			totalParts++
-		}
-	}
-
-	fs.Debugf(o, "Multipart upload session started for %d parts of size %v", totalParts, fs.SizeSuffix(chunkSize))
-
-	// unwrap the accounting from the input, we use wrap to put it
-	// back on after the buffering
-	in, wrap := accounting.UnWrap(in)
-
-	// Upload the chunks
-	var (
-		g, gCtx       = errgroup.WithContext(ctx)
-		remaining     = size                           // remaining size in file for logging only, -1 if size < 0
-		position      = int64(0)                       // position in file
-		memPool       = o.fs.getMemoryPool(chunkSize)  // pool to get memory from
-		finished      = false                          // set when we have read EOF
-		blocks        []string                         // list of blocks for finalize
-		blockBlobURL  = blob.ToBlockBlobURL()          // Get BlockBlobURL, we will use default pipeline here
-		ac            = azblob.LeaseAccessConditions{} // Use default lease access conditions
-		binaryBlockID = make([]byte, 8)                // block counter as LSB first 8 bytes
-	)
-	for part := 0; !finished; part++ {
-		// Get a block of memory from the pool and a token which limits concurrency
-		o.fs.uploadToken.Get()
-		buf := memPool.Get()
-
-		free := func() {
-			memPool.Put(buf)       // return the buf
-			o.fs.uploadToken.Put() // return the token
-		}
-
-		// Fail fast, in case an errgroup managed function returns an error
-		// gCtx is cancelled. There is no point in uploading all the other parts.
-		if gCtx.Err() != nil {
-			free()
-			break
-		}
-
-		// Read the chunk
-		n, err := readers.ReadFill(in, buf) // this can never return 0, nil
-		if err == io.EOF {
-			if n == 0 { // end if no data
-				free()
-				break
-			}
-			finished = true
-		} else if err != nil {
-			free()
-			return errors.Wrap(err, "multipart upload failed to read source")
-		}
-		buf = buf[:n]
-
-		// increment the blockID and save the blocks for finalize
-		increment(binaryBlockID)
-		blockID := base64.StdEncoding.EncodeToString(binaryBlockID)
-		blocks = append(blocks, blockID)
-
-		// Transfer the chunk
-		fs.Debugf(o, "Uploading part %d/%d offset %v/%v part size %v", part+1, totalParts, fs.SizeSuffix(position), fs.SizeSuffix(size), fs.SizeSuffix(chunkSize))
-		g.Go(func() (err error) {
-			defer free()
-
-			// Upload the block, with MD5 for check
-			md5sum := md5.Sum(buf)
-			transactionalMD5 := md5sum[:]
-			err = o.fs.pacer.Call(func() (bool, error) {
-				bufferReader := bytes.NewReader(buf)
-				wrappedReader := wrap(bufferReader)
-				rs := readSeeker{wrappedReader, bufferReader}
-				_, err = blockBlobURL.StageBlock(ctx, blockID, &rs, ac, transactionalMD5, azblob.ClientProvidedKeyOptions{})
-				return o.fs.shouldRetry(err)
-			})
-			if err != nil {
-				return errors.Wrap(err, "multipart upload failed to upload part")
-			}
-			return nil
-		})
-
-		// ready for next block
-		if size >= 0 {
-			remaining -= chunkSize
-		}
-		position += chunkSize
-	}
-	err = g.Wait()
-	if err != nil {
-		return err
-	}
-
-	// Finalise the upload session
-	err = o.fs.pacer.Call(func() (bool, error) {
-		_, err := blockBlobURL.CommitBlockList(ctx, blocks, *httpHeaders, o.meta, azblob.BlobAccessConditions{}, azblob.AccessTierType(o.fs.opt.AccessTier), nil, azblob.ClientProvidedKeyOptions{})
-		return o.fs.shouldRetry(err)
-	})
-	if err != nil {
-		return errors.Wrap(err, "multipart upload failed to finalize")
-	}
-	return nil
-}
-
 // Update the object with the contents of the io.Reader, modTime and size
 //
 // The new object may have been created if an error is returned
@@ -1685,7 +1512,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
 	if err != nil {
 		return err
 	}
-	size := src.Size()
+
 	// Update Mod time
 	o.updateMetadataWithModTime(src.ModTime(ctx))
 	if err != nil {
@@ -1695,10 +1522,9 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
 	blob := o.getBlobReference()
 	httpHeaders := azblob.BlobHTTPHeaders{}
 	httpHeaders.ContentType = fs.MimeType(ctx, src)
-	// Compute the Content-MD5 of the file, for multiparts uploads it
+
+	// Compute the Content-MD5 of the file. As we stream all uploads it
 	// will be set in PutBlockList API call using the 'x-ms-blob-content-md5' header
-	// Note: If multipart, an MD5 checksum will also be computed for each uploaded block
-	// 		 in order to validate its integrity during transport
 	if !o.fs.opt.DisableCheckSum {
 		if sourceMD5, _ := src.Hash(ctx, hash.MD5); sourceMD5 != "" {
 			sourceMD5bytes, err := hex.DecodeString(sourceMD5)
@@ -1716,26 +1542,12 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
 		Metadata:        o.meta,
 		BlobHTTPHeaders: httpHeaders,
 	}
-	// FIXME Until https://github.com/Azure/azure-storage-blob-go/pull/75
-	// is merged the SDK can't upload a single blob of exactly the chunk
-	// size, so upload with a multipart upload to work around.
-	// See: https://github.com/rclone/rclone/issues/2653
-	multipartUpload := size < 0 || size >= int64(o.fs.opt.UploadCutoff)
-	if size == int64(o.fs.opt.ChunkSize) {
-		multipartUpload = true
-		fs.Debugf(o, "Setting multipart upload for file of chunk size (%d) to work around SDK bug", size)
-	}
 
 	// Don't retry, return a retry error instead
 	err = o.fs.pacer.CallNoRetry(func() (bool, error) {
-		if multipartUpload {
-			// If a large file upload in chunks
-			err = o.uploadMultipart(ctx, in, size, &blob, &httpHeaders)
-		} else {
-			// Write a small blob in one transaction
-			blockBlobURL := blob.ToBlockBlobURL()
-			_, err = azblob.UploadStreamToBlockBlob(ctx, in, blockBlobURL, putBlobOptions)
-		}
+		// Stream contents of the reader object to the given blob URL
+		blockBlobURL := blob.ToBlockBlobURL()
+		_, err = azblob.UploadStreamToBlockBlob(ctx, in, blockBlobURL, putBlobOptions)
 		return o.fs.shouldRetry(err)
 	})
 	if err != nil {
diff --git a/backend/azureblob/azureblob_test.go b/backend/azureblob/azureblob_test.go
index 4d5930cdd..3f19a639d 100644
--- a/backend/azureblob/azureblob_test.go
+++ b/backend/azureblob/azureblob_test.go
@@ -29,13 +29,8 @@ func (f *Fs) SetUploadChunkSize(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
 	return f.setUploadChunkSize(cs)
 }
 
-func (f *Fs) SetUploadCutoff(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
-	return f.setUploadCutoff(cs)
-}
-
 var (
 	_ fstests.SetUploadChunkSizer = (*Fs)(nil)
-	_ fstests.SetUploadCutoffer   = (*Fs)(nil)
 )
 
 // TestServicePrincipalFileSuccess checks that, given a proper JSON file, we can create a token.
diff --git a/docs/content/azureblob.md b/docs/content/azureblob.md
index c4d174995..a48f9c837 100644
--- a/docs/content/azureblob.md
+++ b/docs/content/azureblob.md
@@ -146,27 +146,6 @@ Container level SAS URLs are useful for temporarily allowing third
 parties access to a single container or putting credentials into an
 untrusted environment such as a CI build server.
 
-### Multipart uploads ###
-
-Rclone supports multipart uploads with Azure Blob storage.  Files
-bigger than 256MB will be uploaded using chunked upload by default.
-
-The files will be uploaded in parallel in 4MB chunks (by default).
-Note that these chunks are buffered in memory and there may be up to
-`--transfers` of them being uploaded at once.
-
-Files can't be split into more than 50,000 chunks so by default, so
-the largest file that can be uploaded with 4MB chunk size is 195GB.
-Above this rclone will double the chunk size until it creates less
-than 50,000 chunks.  By default this will mean a maximum file size of
-3.2TB can be uploaded.  This can be raised to 5TB using
-`--azureblob-chunk-size 100M`.
-
-Note that rclone doesn't commit the block list until the end of the
-upload which means that there is a limit of 9.5TB of multipart uploads
-in progress as Azure won't allow more than that amount of uncommitted
-blocks.
-
 {{< rem autogenerated options start" - DO NOT EDIT - instead edit fs.RegInfo in backend/azureblob/azureblob.go then run make backenddocs" >}}
 ### Standard Options
 
@@ -223,15 +202,6 @@ Leave blank normally.
 - Type:        string
 - Default:     ""
 
-#### --azureblob-upload-cutoff
-
-Cutoff for switching to chunked upload (<= 256MB).
-
-- Config:      upload_cutoff
-- Env Var:     RCLONE_AZUREBLOB_UPLOAD_CUTOFF
-- Type:        SizeSuffix
-- Default:     256M
-
 #### --azureblob-chunk-size
 
 Upload chunk size (<= 100MB).