azureblob: implement streaming of unknown sized files

See: https://forum.rclone.org/t/rclone-rcat-azure-blob-container-sas-token-403-error/16286/3
This commit is contained in:
Nick Craig-Wood 2020-05-13 21:29:21 +01:00
parent 8c9c86c3d6
commit 4a1b644bfb
3 changed files with 128 additions and 105 deletions

View file

@ -9,7 +9,6 @@ import (
"context" "context"
"crypto/md5" "crypto/md5"
"encoding/base64" "encoding/base64"
"encoding/binary"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"io" "io"
@ -36,6 +35,8 @@ import (
"github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/encoder"
"github.com/rclone/rclone/lib/pacer" "github.com/rclone/rclone/lib/pacer"
"github.com/rclone/rclone/lib/pool" "github.com/rclone/rclone/lib/pool"
"github.com/rclone/rclone/lib/readers"
"golang.org/x/sync/errgroup"
) )
const ( const (
@ -857,6 +858,11 @@ func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options .
return fs, fs.Update(ctx, in, src, options...) return fs, fs.Update(ctx, in, src, options...)
} }
// PutStream uploads to the remote path with the modTime given of indeterminate size
func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
return f.Put(ctx, in, src, options...)
}
// Mkdir creates the container if it doesn't exist // Mkdir creates the container if it doesn't exist
func (f *Fs) Mkdir(ctx context.Context, dir string) error { func (f *Fs) Mkdir(ctx context.Context, dir string) error {
container, _ := f.split(dir) container, _ := f.split(dir)
@ -1279,141 +1285,140 @@ type readSeeker struct {
io.Seeker io.Seeker
} }
// increment the slice passed in as LSB binary
func increment(xs []byte) {
for i, digit := range xs {
newDigit := digit + 1
xs[i] = newDigit
if newDigit >= digit {
// exit if no carry
break
}
}
}
var warnStreamUpload sync.Once
// uploadMultipart uploads a file using multipart upload // uploadMultipart uploads a file using multipart upload
// //
// Write a larger blob, using CreateBlockBlob, PutBlock, and PutBlockList. // Write a larger blob, using CreateBlockBlob, PutBlock, and PutBlockList.
func (o *Object) uploadMultipart(in io.Reader, size int64, blob *azblob.BlobURL, httpHeaders *azblob.BlobHTTPHeaders) (err error) { func (o *Object) uploadMultipart(ctx context.Context, in io.Reader, size int64, blob *azblob.BlobURL, httpHeaders *azblob.BlobHTTPHeaders) (err error) {
// Calculate correct chunkSize // Calculate correct chunkSize
chunkSize := int64(o.fs.opt.ChunkSize) chunkSize := int64(o.fs.opt.ChunkSize)
var totalParts int64 totalParts := -1
for {
// Calculate number of parts // Note that the max size of file is 4.75 TB (100 MB X 50,000
var remainder int64 // blocks) and this is bigger than the max uncommitted block
totalParts, remainder = size/chunkSize, size%chunkSize // size (9.52 TB) so we do not need to part commit block lists
if remainder != 0 { // or garbage collect uncommitted blocks.
totalParts++ //
// See: https://docs.microsoft.com/en-gb/rest/api/storageservices/put-block
// size can be -1 here meaning we don't know the size of the incoming file. We use ChunkSize
// buffers here (default 4MB). With a maximum number of parts (50,000) this will be a file of
// 195GB which seems like a not too unreasonable limit.
if size == -1 {
warnStreamUpload.Do(func() {
fs.Logf(o, "Streaming uploads using chunk size %v will have maximum file size of %v",
o.fs.opt.ChunkSize, fs.SizeSuffix(chunkSize*maxTotalParts))
})
} else {
// Adjust partSize until the number of parts is small enough.
if size/chunkSize >= maxTotalParts {
// Calculate partition size rounded up to the nearest MB
chunkSize = (((size / maxTotalParts) >> 20) + 1) << 20
} }
if totalParts < maxTotalParts {
break
}
// Double chunk size if the number of parts is too big
chunkSize *= 2
if chunkSize > int64(maxChunkSize) { if chunkSize > int64(maxChunkSize) {
return errors.Errorf("can't upload as it is too big %v - takes more than %d chunks of %v", fs.SizeSuffix(size), totalParts, fs.SizeSuffix(chunkSize/2)) return errors.Errorf("can't upload as it is too big %v - takes more than %d chunks of %v", fs.SizeSuffix(size), totalParts, fs.SizeSuffix(chunkSize/2))
} }
totalParts = int(size / chunkSize)
if size%chunkSize != 0 {
totalParts++
}
} }
fs.Debugf(o, "Multipart upload session started for %d parts of size %v", totalParts, fs.SizeSuffix(chunkSize)) fs.Debugf(o, "Multipart upload session started for %d parts of size %v", totalParts, fs.SizeSuffix(chunkSize))
// https://godoc.org/github.com/Azure/azure-storage-blob-go/2017-07-29/azblob#example-BlockBlobURL
// Utilities are cloned from above example
// These helper functions convert a binary block ID to a base-64 string and vice versa
// NOTE: The blockID must be <= 64 bytes and ALL blockIDs for the block must be the same length
blockIDBinaryToBase64 := func(blockID []byte) string { return base64.StdEncoding.EncodeToString(blockID) }
// These helper functions convert an int block ID to a base-64 string and vice versa
blockIDIntToBase64 := func(blockID uint64) string {
binaryBlockID := (&[8]byte{})[:] // All block IDs are 8 bytes long
binary.LittleEndian.PutUint64(binaryBlockID, blockID)
return blockIDBinaryToBase64(binaryBlockID)
}
// block ID variables
var (
rawID uint64
blockID = "" // id in base64 encoded form
blocks []string
)
// increment the blockID
nextID := func() {
rawID++
blockID = blockIDIntToBase64(rawID)
blocks = append(blocks, blockID)
}
// Get BlockBlobURL, we will use default pipeline here
blockBlobURL := blob.ToBlockBlobURL()
ctx := context.Background()
ac := azblob.LeaseAccessConditions{} // Use default lease access conditions
// unwrap the accounting from the input, we use wrap to put it // unwrap the accounting from the input, we use wrap to put it
// back on after the buffering // back on after the buffering
in, wrap := accounting.UnWrap(in) in, wrap := accounting.UnWrap(in)
// Upload the chunks // Upload the chunks
remaining := size var (
position := int64(0) g, gCtx = errgroup.WithContext(ctx)
errs := make(chan error, 1) remaining = size // remaining size in file for logging only, -1 if size < 0
var wg sync.WaitGroup position = int64(0) // position in file
memPool := o.fs.getMemoryPool(chunkSize) memPool = o.fs.getMemoryPool(chunkSize) // pool to get memory from
outer: finished = false // set when we have read EOF
for part := 0; part < int(totalParts); part++ { blocks []string // list of blocks for finalize
// Check any errors blockBlobURL = blob.ToBlockBlobURL() // Get BlockBlobURL, we will use default pipeline here
select { ac = azblob.LeaseAccessConditions{} // Use default lease access conditions
case err = <-errs: binaryBlockID = make([]byte, 8) // block counter as LSB first 8 bytes
break outer )
default: for part := 0; !finished; part++ {
}
reqSize := remaining
if reqSize >= chunkSize {
reqSize = chunkSize
}
// Get a block of memory from the pool and a token which limits concurrency // Get a block of memory from the pool and a token which limits concurrency
o.fs.uploadToken.Get() o.fs.uploadToken.Get()
buf := memPool.Get() buf := memPool.Get()
buf = buf[:reqSize]
// Read the chunk free := func() {
_, err = io.ReadFull(in, buf)
if err != nil {
err = errors.Wrap(err, "multipart upload failed to read source")
memPool.Put(buf) // return the buf memPool.Put(buf) // return the buf
o.fs.uploadToken.Put() // return the token o.fs.uploadToken.Put() // return the token
break outer
} }
// Fail fast, in case an errgroup managed function returns an error
// gCtx is cancelled. There is no point in uploading all the other parts.
if gCtx.Err() != nil {
free()
break
}
// Read the chunk
n, err := readers.ReadFill(in, buf) // this can never return 0, nil
if err == io.EOF {
if n == 0 { // end if no data
free()
break
}
finished = true
} else if err != nil {
free()
return errors.Wrap(err, "multipart upload failed to read source")
}
buf = buf[:n]
// increment the blockID and save the blocks for finalize
increment(binaryBlockID)
blockID := base64.StdEncoding.EncodeToString(binaryBlockID)
blocks = append(blocks, blockID)
// Transfer the chunk // Transfer the chunk
nextID() fs.Debugf(o, "Uploading part %d/%d offset %v/%v part size %v", part+1, totalParts, fs.SizeSuffix(position), fs.SizeSuffix(size), fs.SizeSuffix(chunkSize))
wg.Add(1) g.Go(func() (err error) {
go func(part int, position int64, blockID string) { defer free()
defer wg.Done()
defer o.fs.uploadToken.Put()
defer memPool.Put(buf)
fs.Debugf(o, "Uploading part %d/%d offset %v/%v part size %v", part+1, totalParts, fs.SizeSuffix(position), fs.SizeSuffix(size), fs.SizeSuffix(chunkSize))
// Upload the block, with MD5 for check // Upload the block, with MD5 for check
md5sum := md5.Sum(buf) md5sum := md5.Sum(buf)
transactionalMD5 := md5sum[:] transactionalMD5 := md5sum[:]
err := o.fs.pacer.Call(func() (bool, error) { err = o.fs.pacer.Call(func() (bool, error) {
bufferReader := bytes.NewReader(buf) bufferReader := bytes.NewReader(buf)
wrappedReader := wrap(bufferReader) wrappedReader := wrap(bufferReader)
rs := readSeeker{wrappedReader, bufferReader} rs := readSeeker{wrappedReader, bufferReader}
_, err = blockBlobURL.StageBlock(ctx, blockID, &rs, ac, transactionalMD5) _, err = blockBlobURL.StageBlock(ctx, blockID, &rs, ac, transactionalMD5)
return o.fs.shouldRetry(err) return o.fs.shouldRetry(err)
}) })
if err != nil { if err != nil {
err = errors.Wrap(err, "multipart upload failed to upload part") return errors.Wrap(err, "multipart upload failed to upload part")
select {
case errs <- err:
default:
}
return
} }
}(part, position, blockID) return nil
})
// ready for next block // ready for next block
remaining -= chunkSize if size >= 0 {
remaining -= chunkSize
}
position += chunkSize position += chunkSize
} }
wg.Wait() err = g.Wait()
if err == nil {
select {
case err = <-errs:
default:
}
}
if err != nil { if err != nil {
return err return err
} }
@ -1473,7 +1478,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
// is merged the SDK can't upload a single blob of exactly the chunk // is merged the SDK can't upload a single blob of exactly the chunk
// size, so upload with a multpart upload to work around. // size, so upload with a multpart upload to work around.
// See: https://github.com/rclone/rclone/issues/2653 // See: https://github.com/rclone/rclone/issues/2653
multipartUpload := size >= int64(o.fs.opt.UploadCutoff) multipartUpload := size < 0 || size >= int64(o.fs.opt.UploadCutoff)
if size == int64(o.fs.opt.ChunkSize) { if size == int64(o.fs.opt.ChunkSize) {
multipartUpload = true multipartUpload = true
fs.Debugf(o, "Setting multipart upload for file of chunk size (%d) to work around SDK bug", size) fs.Debugf(o, "Setting multipart upload for file of chunk size (%d) to work around SDK bug", size)
@ -1483,7 +1488,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
err = o.fs.pacer.CallNoRetry(func() (bool, error) { err = o.fs.pacer.CallNoRetry(func() (bool, error) {
if multipartUpload { if multipartUpload {
// If a large file upload in chunks // If a large file upload in chunks
err = o.uploadMultipart(in, size, &blob, &httpHeaders) err = o.uploadMultipart(ctx, in, size, &blob, &httpHeaders)
} else { } else {
// Write a small blob in one transaction // Write a small blob in one transaction
blockBlobURL := blob.ToBlockBlobURL() blockBlobURL := blob.ToBlockBlobURL()
@ -1568,12 +1573,13 @@ func (o *Object) GetTier() string {
// Check the interfaces are satisfied // Check the interfaces are satisfied
var ( var (
_ fs.Fs = &Fs{} _ fs.Fs = &Fs{}
_ fs.Copier = &Fs{} _ fs.Copier = &Fs{}
_ fs.Purger = &Fs{} _ fs.PutStreamer = &Fs{}
_ fs.ListRer = &Fs{} _ fs.Purger = &Fs{}
_ fs.Object = &Object{} _ fs.ListRer = &Fs{}
_ fs.MimeTyper = &Object{} _ fs.Object = &Object{}
_ fs.GetTierer = &Object{} _ fs.MimeTyper = &Object{}
_ fs.SetTierer = &Object{} _ fs.GetTierer = &Object{}
_ fs.SetTierer = &Object{}
) )

View file

@ -16,3 +16,20 @@ func (f *Fs) InternalTest(t *testing.T) {
enabled = f.Features().GetTier enabled = f.Features().GetTier
assert.True(t, enabled) assert.True(t, enabled)
} }
func TestIncrement(t *testing.T) {
for _, test := range []struct {
in []byte
want []byte
}{
{[]byte{0, 0, 0, 0}, []byte{1, 0, 0, 0}},
{[]byte{0xFE, 0, 0, 0}, []byte{0xFF, 0, 0, 0}},
{[]byte{0xFF, 0, 0, 0}, []byte{0, 1, 0, 0}},
{[]byte{0, 1, 0, 0}, []byte{1, 1, 0, 0}},
{[]byte{0xFF, 0xFF, 0xFF, 0xFE}, []byte{0, 0, 0, 0xFF}},
{[]byte{0xFF, 0xFF, 0xFF, 0xFF}, []byte{0, 0, 0, 0}},
} {
increment(test.in)
assert.Equal(t, test.want, test.in)
}
}

View file

@ -336,7 +336,7 @@ operations more efficient.
| Mail.ru Cloud | Yes | Yes | Yes | Yes | Yes | No | No | Yes | Yes | Yes | | Mail.ru Cloud | Yes | Yes | Yes | Yes | Yes | No | No | Yes | Yes | Yes |
| Mega | Yes | No | Yes | Yes | Yes | No | No | No [#2178](https://github.com/rclone/rclone/issues/2178) | Yes | Yes | | Mega | Yes | No | Yes | Yes | Yes | No | No | No [#2178](https://github.com/rclone/rclone/issues/2178) | Yes | Yes |
| Memory | No | Yes | No | No | No | Yes | Yes | No | No | No | | Memory | No | Yes | No | No | No | Yes | Yes | No | No | No |
| Microsoft Azure Blob Storage | Yes | Yes | No | No | No | Yes | No | No [#2178](https://github.com/rclone/rclone/issues/2178) | No | No | | Microsoft Azure Blob Storage | Yes | Yes | No | No | No | Yes | Yes | No [#2178](https://github.com/rclone/rclone/issues/2178) | No | No |
| Microsoft OneDrive | Yes | Yes | Yes | Yes | No [#575](https://github.com/rclone/rclone/issues/575) | No | No | Yes | Yes | Yes | | Microsoft OneDrive | Yes | Yes | Yes | Yes | No [#575](https://github.com/rclone/rclone/issues/575) | No | No | Yes | Yes | Yes |
| OpenDrive | Yes | Yes | Yes | Yes | No | No | No | No | No | Yes | | OpenDrive | Yes | Yes | Yes | Yes | No | No | No | No | No | Yes |
| Openstack Swift | Yes † | Yes | No | No | No | Yes | Yes | No [#2178](https://github.com/rclone/rclone/issues/2178) | Yes | No | | Openstack Swift | Yes † | Yes | No | No | No | Yes | Yes | No [#2178](https://github.com/rclone/rclone/issues/2178) | Yes | No |