azureblob: implement OpenChunkWriter and multi-thread uploads #7056
This implements the OpenChunkWriter interface for azureblob which enables multi-thread uploads. This makes the memory controls of the s3 backend inoperative; they are replaced with the global ones. --azureblob-memory-pool-flush-time --azureblob-memory-pool-use-mmap By using the buffered reader this fixes excessive memory use when uploading large files as it will share memory pages between all readers.
This commit is contained in:
parent
3dfcfc2caa
commit
0427177857
2 changed files with 298 additions and 273 deletions
|
@ -5,7 +5,6 @@
|
||||||
package azureblob
|
package azureblob
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"crypto/md5"
|
"crypto/md5"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
|
@ -18,6 +17,7 @@ import (
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -33,7 +33,6 @@ import (
|
||||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
|
||||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
|
||||||
"github.com/rclone/rclone/fs"
|
"github.com/rclone/rclone/fs"
|
||||||
"github.com/rclone/rclone/fs/accounting"
|
|
||||||
"github.com/rclone/rclone/fs/chunksize"
|
"github.com/rclone/rclone/fs/chunksize"
|
||||||
"github.com/rclone/rclone/fs/config"
|
"github.com/rclone/rclone/fs/config"
|
||||||
"github.com/rclone/rclone/fs/config/configmap"
|
"github.com/rclone/rclone/fs/config/configmap"
|
||||||
|
@ -46,10 +45,8 @@ import (
|
||||||
"github.com/rclone/rclone/lib/bucket"
|
"github.com/rclone/rclone/lib/bucket"
|
||||||
"github.com/rclone/rclone/lib/encoder"
|
"github.com/rclone/rclone/lib/encoder"
|
||||||
"github.com/rclone/rclone/lib/env"
|
"github.com/rclone/rclone/lib/env"
|
||||||
|
"github.com/rclone/rclone/lib/multipart"
|
||||||
"github.com/rclone/rclone/lib/pacer"
|
"github.com/rclone/rclone/lib/pacer"
|
||||||
"github.com/rclone/rclone/lib/pool"
|
|
||||||
"github.com/rclone/rclone/lib/readers"
|
|
||||||
"golang.org/x/sync/errgroup"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -70,8 +67,6 @@ const (
|
||||||
emulatorAccount = "devstoreaccount1"
|
emulatorAccount = "devstoreaccount1"
|
||||||
emulatorAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
|
emulatorAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
|
||||||
emulatorBlobEndpoint = "http://127.0.0.1:10000/devstoreaccount1"
|
emulatorBlobEndpoint = "http://127.0.0.1:10000/devstoreaccount1"
|
||||||
memoryPoolFlushTime = fs.Duration(time.Minute) // flush the cached buffers after this long
|
|
||||||
memoryPoolUseMmap = false
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -337,17 +332,16 @@ to start uploading.`,
|
||||||
Advanced: true,
|
Advanced: true,
|
||||||
}, {
|
}, {
|
||||||
Name: "memory_pool_flush_time",
|
Name: "memory_pool_flush_time",
|
||||||
Default: memoryPoolFlushTime,
|
Default: fs.Duration(time.Minute),
|
||||||
Advanced: true,
|
Advanced: true,
|
||||||
Help: `How often internal memory buffer pools will be flushed.
|
Hide: fs.OptionHideBoth,
|
||||||
|
Help: `How often internal memory buffer pools will be flushed. (no longer used)`,
|
||||||
Uploads which requires additional buffers (f.e multipart) will use memory pool for allocations.
|
|
||||||
This option controls how often unused buffers will be removed from the pool.`,
|
|
||||||
}, {
|
}, {
|
||||||
Name: "memory_pool_use_mmap",
|
Name: "memory_pool_use_mmap",
|
||||||
Default: memoryPoolUseMmap,
|
Default: false,
|
||||||
Advanced: true,
|
Advanced: true,
|
||||||
Help: `Whether to use mmap buffers in internal memory pool.`,
|
Hide: fs.OptionHideBoth,
|
||||||
|
Help: `Whether to use mmap buffers in internal memory pool. (no longer used)`,
|
||||||
}, {
|
}, {
|
||||||
Name: config.ConfigEncoding,
|
Name: config.ConfigEncoding,
|
||||||
Help: config.ConfigEncodingHelp,
|
Help: config.ConfigEncodingHelp,
|
||||||
|
@ -432,8 +426,6 @@ type Options struct {
|
||||||
ArchiveTierDelete bool `config:"archive_tier_delete"`
|
ArchiveTierDelete bool `config:"archive_tier_delete"`
|
||||||
UseEmulator bool `config:"use_emulator"`
|
UseEmulator bool `config:"use_emulator"`
|
||||||
DisableCheckSum bool `config:"disable_checksum"`
|
DisableCheckSum bool `config:"disable_checksum"`
|
||||||
MemoryPoolFlushTime fs.Duration `config:"memory_pool_flush_time"`
|
|
||||||
MemoryPoolUseMmap bool `config:"memory_pool_use_mmap"`
|
|
||||||
Enc encoder.MultiEncoder `config:"encoding"`
|
Enc encoder.MultiEncoder `config:"encoding"`
|
||||||
PublicAccess string `config:"public_access"`
|
PublicAccess string `config:"public_access"`
|
||||||
DirectoryMarkers bool `config:"directory_markers"`
|
DirectoryMarkers bool `config:"directory_markers"`
|
||||||
|
@ -457,8 +449,6 @@ type Fs struct {
|
||||||
cache *bucket.Cache // cache for container creation status
|
cache *bucket.Cache // cache for container creation status
|
||||||
pacer *fs.Pacer // To pace and retry the API calls
|
pacer *fs.Pacer // To pace and retry the API calls
|
||||||
uploadToken *pacer.TokenDispenser // control concurrency
|
uploadToken *pacer.TokenDispenser // control concurrency
|
||||||
pool *pool.Pool // memory pool
|
|
||||||
poolSize int64 // size of pages in memory pool
|
|
||||||
publicAccess container.PublicAccessType // Container Public Access Level
|
publicAccess container.PublicAccessType // Container Public Access Level
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -671,13 +661,6 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
|
||||||
uploadToken: pacer.NewTokenDispenser(ci.Transfers),
|
uploadToken: pacer.NewTokenDispenser(ci.Transfers),
|
||||||
cache: bucket.NewCache(),
|
cache: bucket.NewCache(),
|
||||||
cntSVCcache: make(map[string]*container.Client, 1),
|
cntSVCcache: make(map[string]*container.Client, 1),
|
||||||
pool: pool.New(
|
|
||||||
time.Duration(opt.MemoryPoolFlushTime),
|
|
||||||
int(opt.ChunkSize),
|
|
||||||
ci.Transfers,
|
|
||||||
opt.MemoryPoolUseMmap,
|
|
||||||
),
|
|
||||||
poolSize: int64(opt.ChunkSize),
|
|
||||||
}
|
}
|
||||||
f.publicAccess = container.PublicAccessType(opt.PublicAccess)
|
f.publicAccess = container.PublicAccessType(opt.PublicAccess)
|
||||||
f.setRoot(root)
|
f.setRoot(root)
|
||||||
|
@ -1594,19 +1577,6 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
|
||||||
return f.NewObject(ctx, remote)
|
return f.NewObject(ctx, remote)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *Fs) getMemoryPool(size int64) *pool.Pool {
|
|
||||||
if size == int64(f.opt.ChunkSize) {
|
|
||||||
return f.pool
|
|
||||||
}
|
|
||||||
|
|
||||||
return pool.New(
|
|
||||||
time.Duration(f.opt.MemoryPoolFlushTime),
|
|
||||||
int(size),
|
|
||||||
f.ci.Transfers,
|
|
||||||
f.opt.MemoryPoolUseMmap,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ------------------------------------------------------------
|
// ------------------------------------------------------------
|
||||||
|
|
||||||
// Fs returns the parent Fs
|
// Fs returns the parent Fs
|
||||||
|
@ -1982,8 +1952,8 @@ func (rs *readSeekCloser) Close() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// increment the slice passed in as LSB binary
|
// increment the array as LSB binary
|
||||||
func increment(xs []byte) {
|
func increment(xs *[8]byte) {
|
||||||
for i, digit := range xs {
|
for i, digit := range xs {
|
||||||
newDigit := digit + 1
|
newDigit := digit + 1
|
||||||
xs[i] = newDigit
|
xs[i] = newDigit
|
||||||
|
@ -1994,22 +1964,43 @@ func increment(xs []byte) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var warnStreamUpload sync.Once
|
// record chunk number and id for Close
|
||||||
|
type azBlock struct {
|
||||||
|
chunkNumber int
|
||||||
|
id string
|
||||||
|
}
|
||||||
|
|
||||||
// uploadMultipart uploads a file using multipart upload
|
// Implements the fs.ChunkWriter interface
|
||||||
|
type azChunkWriter struct {
|
||||||
|
chunkSize int64
|
||||||
|
size int64
|
||||||
|
f *Fs
|
||||||
|
ui uploadInfo
|
||||||
|
blocksMu sync.Mutex // protects the below
|
||||||
|
blocks []azBlock // list of blocks for finalize
|
||||||
|
binaryBlockID [8]byte // block counter as LSB first 8 bytes
|
||||||
|
o *Object
|
||||||
|
}
|
||||||
|
|
||||||
|
// OpenChunkWriter returns the chunk size and a ChunkWriter
|
||||||
//
|
//
|
||||||
// Write a larger blob, using CreateBlockBlob, PutBlock, and PutBlockList.
|
// Pass in the remote and the src object
|
||||||
func (o *Object) uploadMultipart(ctx context.Context, in io.Reader, size int64, blb *blockblob.Client, httpHeaders *blob.HTTPHeaders) (err error) {
|
// You can also use options to hint at the desired chunk size
|
||||||
// Calculate correct partSize
|
func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (chunkSizeResult int64, writer fs.ChunkWriter, err error) {
|
||||||
partSize := o.fs.opt.ChunkSize
|
// Temporary Object under construction
|
||||||
totalParts := -1
|
o := &Object{
|
||||||
|
fs: f,
|
||||||
// make concurrency machinery
|
remote: remote,
|
||||||
concurrency := o.fs.opt.UploadConcurrency
|
|
||||||
if concurrency < 1 {
|
|
||||||
concurrency = 1
|
|
||||||
}
|
}
|
||||||
tokens := pacer.NewTokenDispenser(concurrency)
|
ui, err := o.prepareUpload(ctx, src, options)
|
||||||
|
if err != nil {
|
||||||
|
return -1, nil, fmt.Errorf("failed to prepare upload: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calculate correct partSize
|
||||||
|
partSize := f.opt.ChunkSize
|
||||||
|
totalParts := -1
|
||||||
|
size := src.Size()
|
||||||
|
|
||||||
// Note that the max size of file is 4.75 TB (100 MB X 50,000
|
// Note that the max size of file is 4.75 TB (100 MB X 50,000
|
||||||
// blocks) and this is bigger than the max uncommitted block
|
// blocks) and this is bigger than the max uncommitted block
|
||||||
|
@ -2023,13 +2014,13 @@ func (o *Object) uploadMultipart(ctx context.Context, in io.Reader, size int64,
|
||||||
// 195GB which seems like a not too unreasonable limit.
|
// 195GB which seems like a not too unreasonable limit.
|
||||||
if size == -1 {
|
if size == -1 {
|
||||||
warnStreamUpload.Do(func() {
|
warnStreamUpload.Do(func() {
|
||||||
fs.Logf(o, "Streaming uploads using chunk size %v will have maximum file size of %v",
|
fs.Logf(f, "Streaming uploads using chunk size %v will have maximum file size of %v",
|
||||||
o.fs.opt.ChunkSize, partSize*fs.SizeSuffix(blockblob.MaxBlocks))
|
f.opt.ChunkSize, partSize*fs.SizeSuffix(blockblob.MaxBlocks))
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
partSize = chunksize.Calculator(o, size, blockblob.MaxBlocks, o.fs.opt.ChunkSize)
|
partSize = chunksize.Calculator(remote, size, blockblob.MaxBlocks, f.opt.ChunkSize)
|
||||||
if partSize > fs.SizeSuffix(blockblob.MaxStageBlockBytes) {
|
if partSize > fs.SizeSuffix(blockblob.MaxStageBlockBytes) {
|
||||||
return fmt.Errorf("can't upload as it is too big %v - takes more than %d chunks of %v", fs.SizeSuffix(size), fs.SizeSuffix(blockblob.MaxBlocks), fs.SizeSuffix(blockblob.MaxStageBlockBytes))
|
return -1, nil, fmt.Errorf("can't upload as it is too big %v - takes more than %d chunks of %v", fs.SizeSuffix(size), fs.SizeSuffix(blockblob.MaxBlocks), fs.SizeSuffix(blockblob.MaxStageBlockBytes))
|
||||||
}
|
}
|
||||||
totalParts = int(fs.SizeSuffix(size) / partSize)
|
totalParts = int(fs.SizeSuffix(size) / partSize)
|
||||||
if fs.SizeSuffix(size)%partSize != 0 {
|
if fs.SizeSuffix(size)%partSize != 0 {
|
||||||
|
@ -2039,173 +2030,259 @@ func (o *Object) uploadMultipart(ctx context.Context, in io.Reader, size int64,
|
||||||
|
|
||||||
fs.Debugf(o, "Multipart upload session started for %d parts of size %v", totalParts, partSize)
|
fs.Debugf(o, "Multipart upload session started for %d parts of size %v", totalParts, partSize)
|
||||||
|
|
||||||
// unwrap the accounting from the input, we use wrap to put it
|
chunkWriter := &azChunkWriter{
|
||||||
// back on after the buffering
|
chunkSize: int64(partSize),
|
||||||
in, wrap := accounting.UnWrap(in)
|
size: size,
|
||||||
|
f: f,
|
||||||
// FIXME it would be nice to delete uncommitted blocks
|
ui: ui,
|
||||||
// See: https://github.com/rclone/rclone/issues/5583
|
o: o,
|
||||||
//
|
|
||||||
// However there doesn't seem to be an easy way of doing this other than
|
|
||||||
// by deleting the target.
|
|
||||||
//
|
|
||||||
// This means that a failed upload deletes the target which isn't ideal.
|
|
||||||
//
|
|
||||||
// Uploading a zero length blob and deleting it will remove the
|
|
||||||
// uncommitted blocks I think.
|
|
||||||
//
|
|
||||||
// Could check to see if a file exists already and if it
|
|
||||||
// doesn't then create a 0 length file and delete it to flush
|
|
||||||
// the uncommitted blocks.
|
|
||||||
//
|
|
||||||
// This is what azcopy does
|
|
||||||
// https://github.com/MicrosoftDocs/azure-docs/issues/36347#issuecomment-541457962
|
|
||||||
// defer atexit.OnError(&err, func() {
|
|
||||||
// fs.Debugf(o, "Cancelling multipart upload")
|
|
||||||
// // Code goes here!
|
|
||||||
// })()
|
|
||||||
|
|
||||||
// Upload the chunks
|
|
||||||
var (
|
|
||||||
g, gCtx = errgroup.WithContext(ctx)
|
|
||||||
remaining = fs.SizeSuffix(size) // remaining size in file for logging only, -1 if size < 0
|
|
||||||
position = fs.SizeSuffix(0) // position in file
|
|
||||||
memPool = o.fs.getMemoryPool(int64(partSize)) // pool to get memory from
|
|
||||||
finished = false // set when we have read EOF
|
|
||||||
blocks []string // list of blocks for finalize
|
|
||||||
binaryBlockID = make([]byte, 8) // block counter as LSB first 8 bytes
|
|
||||||
)
|
|
||||||
for part := 0; !finished; part++ {
|
|
||||||
// Get a block of memory from the pool and a token which limits concurrency
|
|
||||||
tokens.Get()
|
|
||||||
buf := memPool.Get()
|
|
||||||
|
|
||||||
free := func() {
|
|
||||||
memPool.Put(buf) // return the buf
|
|
||||||
tokens.Put() // return the token
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fail fast, in case an errgroup managed function returns an error
|
|
||||||
// gCtx is cancelled. There is no point in uploading all the other parts.
|
|
||||||
if gCtx.Err() != nil {
|
|
||||||
free()
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read the chunk
|
|
||||||
n, err := readers.ReadFill(in, buf) // this can never return 0, nil
|
|
||||||
if err == io.EOF {
|
|
||||||
if n == 0 { // end if no data
|
|
||||||
free()
|
|
||||||
break
|
|
||||||
}
|
|
||||||
finished = true
|
|
||||||
} else if err != nil {
|
|
||||||
free()
|
|
||||||
return fmt.Errorf("multipart upload failed to read source: %w", err)
|
|
||||||
}
|
|
||||||
buf = buf[:n]
|
|
||||||
|
|
||||||
// increment the blockID and save the blocks for finalize
|
|
||||||
increment(binaryBlockID)
|
|
||||||
blockID := base64.StdEncoding.EncodeToString(binaryBlockID)
|
|
||||||
blocks = append(blocks, blockID)
|
|
||||||
|
|
||||||
// Transfer the chunk
|
|
||||||
fs.Debugf(o, "Uploading part %d/%d offset %v/%v part size %d", part+1, totalParts, position, fs.SizeSuffix(size), len(buf))
|
|
||||||
g.Go(func() (err error) {
|
|
||||||
defer free()
|
|
||||||
|
|
||||||
// Upload the block, with MD5 for check
|
|
||||||
md5sum := md5.Sum(buf)
|
|
||||||
transactionalMD5 := md5sum[:]
|
|
||||||
err = o.fs.pacer.Call(func() (bool, error) {
|
|
||||||
bufferReader := bytes.NewReader(buf)
|
|
||||||
wrappedReader := wrap(bufferReader)
|
|
||||||
rs := readSeekCloser{wrappedReader, bufferReader}
|
|
||||||
options := blockblob.StageBlockOptions{
|
|
||||||
// Specify the transactional md5 for the body, to be validated by the service.
|
|
||||||
TransactionalValidation: blob.TransferValidationTypeMD5(transactionalMD5),
|
|
||||||
}
|
|
||||||
_, err = blb.StageBlock(ctx, blockID, &rs, &options)
|
|
||||||
return o.fs.shouldRetry(ctx, err)
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("multipart upload failed to upload part: %w", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
// ready for next block
|
|
||||||
if size >= 0 {
|
|
||||||
remaining -= partSize
|
|
||||||
}
|
|
||||||
position += partSize
|
|
||||||
}
|
}
|
||||||
err = g.Wait()
|
fs.Debugf(o, "open chunk writer: started multipart upload")
|
||||||
|
return int64(partSize), chunkWriter, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// WriteChunk will write chunk number with reader bytes, where chunk number >= 0
|
||||||
|
func (w *azChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (int64, error) {
|
||||||
|
if chunkNumber < 0 {
|
||||||
|
err := fmt.Errorf("invalid chunk number provided: %v", chunkNumber)
|
||||||
|
return -1, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Upload the block, with MD5 for check
|
||||||
|
m := md5.New()
|
||||||
|
currentChunkSize, err := io.Copy(m, reader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return -1, err
|
||||||
}
|
}
|
||||||
|
// If no data read, don't write the chunk
|
||||||
options := blockblob.CommitBlockListOptions{
|
if currentChunkSize == 0 {
|
||||||
Metadata: o.getMetadata(),
|
return 0, nil
|
||||||
Tier: parseTier(o.fs.opt.AccessTier),
|
|
||||||
HTTPHeaders: httpHeaders,
|
|
||||||
}
|
}
|
||||||
|
md5sum := m.Sum(nil)
|
||||||
|
transactionalMD5 := md5sum[:]
|
||||||
|
|
||||||
// Finalise the upload session
|
// increment the blockID and save the blocks for finalize
|
||||||
err = o.fs.pacer.Call(func() (bool, error) {
|
increment(&w.binaryBlockID)
|
||||||
_, err := blb.CommitBlockList(ctx, blocks, &options)
|
blockID := base64.StdEncoding.EncodeToString(w.binaryBlockID[:])
|
||||||
return o.fs.shouldRetry(ctx, err)
|
|
||||||
|
// Save the blockID for the commit
|
||||||
|
w.blocksMu.Lock()
|
||||||
|
w.blocks = append(w.blocks, azBlock{
|
||||||
|
chunkNumber: chunkNumber,
|
||||||
|
id: blockID,
|
||||||
|
})
|
||||||
|
w.blocksMu.Unlock()
|
||||||
|
|
||||||
|
err = w.f.pacer.Call(func() (bool, error) {
|
||||||
|
// rewind the reader on retry and after reading md5
|
||||||
|
_, err = reader.Seek(0, io.SeekStart)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
options := blockblob.StageBlockOptions{
|
||||||
|
// Specify the transactional md5 for the body, to be validated by the service.
|
||||||
|
TransactionalValidation: blob.TransferValidationTypeMD5(transactionalMD5),
|
||||||
|
}
|
||||||
|
_, err = w.ui.blb.StageBlock(ctx, blockID, &readSeekCloser{Reader: reader, Seeker: reader}, &options)
|
||||||
|
if err != nil {
|
||||||
|
if chunkNumber <= 8 {
|
||||||
|
return w.f.shouldRetry(ctx, err)
|
||||||
|
}
|
||||||
|
// retry all chunks once have done the first few
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("multipart upload failed to finalize: %w", err)
|
return -1, fmt.Errorf("failed to upload chunk %d with %v bytes: %w", chunkNumber+1, currentChunkSize, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fs.Debugf(w.o, "multipart upload wrote chunk %d with %v bytes", chunkNumber+1, currentChunkSize)
|
||||||
|
return currentChunkSize, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Abort the multpart upload.
|
||||||
|
//
|
||||||
|
// FIXME it would be nice to delete uncommitted blocks.
|
||||||
|
//
|
||||||
|
// See: https://github.com/rclone/rclone/issues/5583
|
||||||
|
//
|
||||||
|
// However there doesn't seem to be an easy way of doing this other than
|
||||||
|
// by deleting the target.
|
||||||
|
//
|
||||||
|
// This means that a failed upload deletes the target which isn't ideal.
|
||||||
|
//
|
||||||
|
// Uploading a zero length blob and deleting it will remove the
|
||||||
|
// uncommitted blocks I think.
|
||||||
|
//
|
||||||
|
// Could check to see if a file exists already and if it doesn't then
|
||||||
|
// create a 0 length file and delete it to flush the uncommitted
|
||||||
|
// blocks.
|
||||||
|
//
|
||||||
|
// This is what azcopy does
|
||||||
|
// https://github.com/MicrosoftDocs/azure-docs/issues/36347#issuecomment-541457962
|
||||||
|
func (w *azChunkWriter) Abort(ctx context.Context) error {
|
||||||
|
fs.Debugf(w.o, "multipart upload aborted (did nothing - see issue #5583)")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close and finalise the multipart upload
|
||||||
|
func (w *azChunkWriter) Close(ctx context.Context) (err error) {
|
||||||
|
// sort the completed parts by part number
|
||||||
|
sort.Slice(w.blocks, func(i, j int) bool {
|
||||||
|
return w.blocks[i].chunkNumber < w.blocks[j].chunkNumber
|
||||||
|
})
|
||||||
|
|
||||||
|
// Create a list of block IDs
|
||||||
|
blockIDs := make([]string, len(w.blocks))
|
||||||
|
for i := range w.blocks {
|
||||||
|
blockIDs[i] = w.blocks[i].id
|
||||||
|
}
|
||||||
|
|
||||||
|
options := blockblob.CommitBlockListOptions{
|
||||||
|
Metadata: w.o.getMetadata(),
|
||||||
|
Tier: parseTier(w.f.opt.AccessTier),
|
||||||
|
HTTPHeaders: &w.ui.httpHeaders,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finalise the upload session
|
||||||
|
err = w.f.pacer.Call(func() (bool, error) {
|
||||||
|
_, err := w.ui.blb.CommitBlockList(ctx, blockIDs, &options)
|
||||||
|
return w.f.shouldRetry(ctx, err)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to complete multipart upload: %w", err)
|
||||||
|
}
|
||||||
|
fs.Debugf(w.o, "multipart upload finished")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var warnStreamUpload sync.Once
|
||||||
|
|
||||||
|
// uploadMultipart uploads a file using multipart upload
|
||||||
|
//
|
||||||
|
// Write a larger blob, using CreateBlockBlob, PutBlock, and PutBlockList.
|
||||||
|
func (o *Object) uploadMultipart(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (ui uploadInfo, err error) {
|
||||||
|
chunkWriter, err := multipart.UploadMultipart(ctx, src, in, multipart.UploadMultipartOptions{
|
||||||
|
Open: o.fs,
|
||||||
|
Concurrency: o.fs.opt.UploadConcurrency,
|
||||||
|
OpenOptions: options,
|
||||||
|
//LeavePartsOnError: o.fs.opt.LeavePartsOnError,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return ui, err
|
||||||
|
}
|
||||||
|
return chunkWriter.(*azChunkWriter).ui, nil
|
||||||
|
}
|
||||||
|
|
||||||
// uploadSinglepart uploads a short blob using a single part upload
|
// uploadSinglepart uploads a short blob using a single part upload
|
||||||
func (o *Object) uploadSinglepart(ctx context.Context, in io.Reader, size int64, blb *blockblob.Client, httpHeaders *blob.HTTPHeaders) (err error) {
|
func (o *Object) uploadSinglepart(ctx context.Context, in io.Reader, size int64, ui uploadInfo) (err error) {
|
||||||
|
chunkSize := int64(o.fs.opt.ChunkSize)
|
||||||
// fs.Debugf(o, "Single part upload starting of object %d bytes", size)
|
// fs.Debugf(o, "Single part upload starting of object %d bytes", size)
|
||||||
if size > o.fs.poolSize || size < 0 {
|
if size > chunkSize || size < 0 {
|
||||||
return fmt.Errorf("internal error: single part upload size too big %d > %d", size, o.fs.opt.ChunkSize)
|
return fmt.Errorf("internal error: single part upload size too big %d > %d", size, chunkSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
buf := o.fs.pool.Get()
|
rw := multipart.NewRW()
|
||||||
defer o.fs.pool.Put(buf)
|
defer fs.CheckClose(rw, &err)
|
||||||
|
|
||||||
n, err := readers.ReadFill(in, buf)
|
n, err := io.CopyN(rw, in, size+1)
|
||||||
if err == nil {
|
|
||||||
// Check to see whether in is exactly len(buf) or bigger
|
|
||||||
var buf2 = []byte{0}
|
|
||||||
n2, err2 := readers.ReadFill(in, buf2)
|
|
||||||
if n2 != 0 || err2 != io.EOF {
|
|
||||||
return fmt.Errorf("single part upload read failed: object longer than expected (expecting %d but got > %d)", size, len(buf))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
return fmt.Errorf("single part upload read failed: %w", err)
|
return fmt.Errorf("single part upload read failed: %w", err)
|
||||||
}
|
}
|
||||||
if int64(n) != size {
|
if n != size {
|
||||||
return fmt.Errorf("single part upload: expecting to read %d bytes but read %d", size, n)
|
return fmt.Errorf("single part upload: expecting to read %d bytes but read %d", size, n)
|
||||||
}
|
}
|
||||||
|
|
||||||
b := bytes.NewReader(buf[:n])
|
rs := &readSeekCloser{Reader: rw, Seeker: rw}
|
||||||
rs := &readSeekCloser{Reader: b, Seeker: b}
|
|
||||||
|
|
||||||
options := blockblob.UploadOptions{
|
options := blockblob.UploadOptions{
|
||||||
Metadata: o.getMetadata(),
|
Metadata: o.getMetadata(),
|
||||||
Tier: parseTier(o.fs.opt.AccessTier),
|
Tier: parseTier(o.fs.opt.AccessTier),
|
||||||
HTTPHeaders: httpHeaders,
|
HTTPHeaders: &ui.httpHeaders,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Don't retry, return a retry error instead
|
return o.fs.pacer.Call(func() (bool, error) {
|
||||||
return o.fs.pacer.CallNoRetry(func() (bool, error) {
|
// rewind the reader on retry
|
||||||
_, err = blb.Upload(ctx, rs, &options)
|
_, err = rs.Seek(0, io.SeekStart)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
_, err = ui.blb.Upload(ctx, rs, &options)
|
||||||
return o.fs.shouldRetry(ctx, err)
|
return o.fs.shouldRetry(ctx, err)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Info needed for an upload
|
||||||
|
type uploadInfo struct {
|
||||||
|
blb *blockblob.Client
|
||||||
|
httpHeaders blob.HTTPHeaders
|
||||||
|
isDirMarker bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prepare the object for upload
|
||||||
|
func (o *Object) prepareUpload(ctx context.Context, src fs.ObjectInfo, options []fs.OpenOption) (ui uploadInfo, err error) {
|
||||||
|
container, containerPath := o.split()
|
||||||
|
if container == "" || containerPath == "" {
|
||||||
|
return ui, fmt.Errorf("can't upload to root - need a container")
|
||||||
|
}
|
||||||
|
// Create parent dir/bucket if not saving directory marker
|
||||||
|
_, isDirMarker := o.meta[dirMetaKey]
|
||||||
|
if !isDirMarker {
|
||||||
|
err = o.fs.mkdirParent(ctx, o.remote)
|
||||||
|
if err != nil {
|
||||||
|
return ui, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update Mod time
|
||||||
|
o.updateMetadataWithModTime(src.ModTime(ctx))
|
||||||
|
if err != nil {
|
||||||
|
return ui, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the HTTP headers for the upload
|
||||||
|
ui.httpHeaders = blob.HTTPHeaders{
|
||||||
|
BlobContentType: pString(fs.MimeType(ctx, src)),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compute the Content-MD5 of the file. As we stream all uploads it
|
||||||
|
// will be set in PutBlockList API call using the 'x-ms-blob-content-md5' header
|
||||||
|
if !o.fs.opt.DisableCheckSum {
|
||||||
|
if sourceMD5, _ := src.Hash(ctx, hash.MD5); sourceMD5 != "" {
|
||||||
|
sourceMD5bytes, err := hex.DecodeString(sourceMD5)
|
||||||
|
if err == nil {
|
||||||
|
ui.httpHeaders.BlobContentMD5 = sourceMD5bytes
|
||||||
|
} else {
|
||||||
|
fs.Debugf(o, "Failed to decode %q as MD5: %v", sourceMD5, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply upload options (also allows one to overwrite content-type)
|
||||||
|
for _, option := range options {
|
||||||
|
key, value := option.Header()
|
||||||
|
lowerKey := strings.ToLower(key)
|
||||||
|
switch lowerKey {
|
||||||
|
case "":
|
||||||
|
// ignore
|
||||||
|
case "cache-control":
|
||||||
|
ui.httpHeaders.BlobCacheControl = pString(value)
|
||||||
|
case "content-disposition":
|
||||||
|
ui.httpHeaders.BlobContentDisposition = pString(value)
|
||||||
|
case "content-encoding":
|
||||||
|
ui.httpHeaders.BlobContentEncoding = pString(value)
|
||||||
|
case "content-language":
|
||||||
|
ui.httpHeaders.BlobContentLanguage = pString(value)
|
||||||
|
case "content-type":
|
||||||
|
ui.httpHeaders.BlobContentType = pString(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ui.blb = o.fs.getBlockBlobSVC(container, containerPath)
|
||||||
|
return ui, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Update the object with the contents of the io.Reader, modTime and size
|
// Update the object with the contents of the io.Reader, modTime and size
|
||||||
//
|
//
|
||||||
// The new object may have been created if an error is returned
|
// The new object may have been created if an error is returned
|
||||||
|
@ -2221,80 +2298,26 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
||||||
return errCantUpdateArchiveTierBlobs
|
return errCantUpdateArchiveTierBlobs
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
container, containerPath := o.split()
|
|
||||||
if container == "" || containerPath == "" {
|
|
||||||
return fmt.Errorf("can't upload to root - need a container")
|
|
||||||
}
|
|
||||||
// Create parent dir/bucket if not saving directory marker
|
|
||||||
_, isDirMarker := o.meta[dirMetaKey]
|
|
||||||
if !isDirMarker {
|
|
||||||
err = o.fs.mkdirParent(ctx, o.remote)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update Mod time
|
|
||||||
fs.Debugf(nil, "o.meta = %+v", o.meta)
|
|
||||||
o.updateMetadataWithModTime(src.ModTime(ctx))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create the HTTP headers for the upload
|
|
||||||
httpHeaders := blob.HTTPHeaders{
|
|
||||||
BlobContentType: pString(fs.MimeType(ctx, src)),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Compute the Content-MD5 of the file. As we stream all uploads it
|
|
||||||
// will be set in PutBlockList API call using the 'x-ms-blob-content-md5' header
|
|
||||||
if !o.fs.opt.DisableCheckSum {
|
|
||||||
if sourceMD5, _ := src.Hash(ctx, hash.MD5); sourceMD5 != "" {
|
|
||||||
sourceMD5bytes, err := hex.DecodeString(sourceMD5)
|
|
||||||
if err == nil {
|
|
||||||
httpHeaders.BlobContentMD5 = sourceMD5bytes
|
|
||||||
} else {
|
|
||||||
fs.Debugf(o, "Failed to decode %q as MD5: %v", sourceMD5, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Apply upload options (also allows one to overwrite content-type)
|
|
||||||
for _, option := range options {
|
|
||||||
key, value := option.Header()
|
|
||||||
lowerKey := strings.ToLower(key)
|
|
||||||
switch lowerKey {
|
|
||||||
case "":
|
|
||||||
// ignore
|
|
||||||
case "cache-control":
|
|
||||||
httpHeaders.BlobCacheControl = pString(value)
|
|
||||||
case "content-disposition":
|
|
||||||
httpHeaders.BlobContentDisposition = pString(value)
|
|
||||||
case "content-encoding":
|
|
||||||
httpHeaders.BlobContentEncoding = pString(value)
|
|
||||||
case "content-language":
|
|
||||||
httpHeaders.BlobContentLanguage = pString(value)
|
|
||||||
case "content-type":
|
|
||||||
httpHeaders.BlobContentType = pString(value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
blb := o.fs.getBlockBlobSVC(container, containerPath)
|
|
||||||
size := src.Size()
|
size := src.Size()
|
||||||
multipartUpload := size < 0 || size > o.fs.poolSize
|
multipartUpload := size < 0 || size > int64(o.fs.opt.ChunkSize)
|
||||||
|
var ui uploadInfo
|
||||||
|
|
||||||
fs.Debugf(nil, "o.meta = %+v", o.meta)
|
|
||||||
if multipartUpload {
|
if multipartUpload {
|
||||||
err = o.uploadMultipart(ctx, in, size, blb, &httpHeaders)
|
ui, err = o.uploadMultipart(ctx, in, src, options...)
|
||||||
} else {
|
} else {
|
||||||
err = o.uploadSinglepart(ctx, in, size, blb, &httpHeaders)
|
ui, err = o.prepareUpload(ctx, src, options)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to prepare upload: %w", err)
|
||||||
|
}
|
||||||
|
err = o.uploadSinglepart(ctx, in, size, ui)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Refresh metadata on object
|
// Refresh metadata on object
|
||||||
if !isDirMarker {
|
if !ui.isDirMarker {
|
||||||
o.clearMetaData()
|
o.clearMetaData()
|
||||||
err = o.readMetaData(ctx)
|
err = o.readMetaData(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -2383,13 +2406,14 @@ func parseTier(tier string) *blob.AccessTier {
|
||||||
|
|
||||||
// Check the interfaces are satisfied
|
// Check the interfaces are satisfied
|
||||||
var (
|
var (
|
||||||
_ fs.Fs = &Fs{}
|
_ fs.Fs = &Fs{}
|
||||||
_ fs.Copier = &Fs{}
|
_ fs.Copier = &Fs{}
|
||||||
_ fs.PutStreamer = &Fs{}
|
_ fs.PutStreamer = &Fs{}
|
||||||
_ fs.Purger = &Fs{}
|
_ fs.Purger = &Fs{}
|
||||||
_ fs.ListRer = &Fs{}
|
_ fs.ListRer = &Fs{}
|
||||||
_ fs.Object = &Object{}
|
_ fs.OpenChunkWriter = &Fs{}
|
||||||
_ fs.MimeTyper = &Object{}
|
_ fs.Object = &Object{}
|
||||||
_ fs.GetTierer = &Object{}
|
_ fs.MimeTyper = &Object{}
|
||||||
_ fs.SetTierer = &Object{}
|
_ fs.GetTierer = &Object{}
|
||||||
|
_ fs.SetTierer = &Object{}
|
||||||
)
|
)
|
||||||
|
|
|
@ -20,17 +20,18 @@ func (f *Fs) InternalTest(t *testing.T) {
|
||||||
|
|
||||||
func TestIncrement(t *testing.T) {
|
func TestIncrement(t *testing.T) {
|
||||||
for _, test := range []struct {
|
for _, test := range []struct {
|
||||||
in []byte
|
in [8]byte
|
||||||
want []byte
|
want [8]byte
|
||||||
}{
|
}{
|
||||||
{[]byte{0, 0, 0, 0}, []byte{1, 0, 0, 0}},
|
{[8]byte{0, 0, 0, 0}, [8]byte{1, 0, 0, 0}},
|
||||||
{[]byte{0xFE, 0, 0, 0}, []byte{0xFF, 0, 0, 0}},
|
{[8]byte{0xFE, 0, 0, 0}, [8]byte{0xFF, 0, 0, 0}},
|
||||||
{[]byte{0xFF, 0, 0, 0}, []byte{0, 1, 0, 0}},
|
{[8]byte{0xFF, 0, 0, 0}, [8]byte{0, 1, 0, 0}},
|
||||||
{[]byte{0, 1, 0, 0}, []byte{1, 1, 0, 0}},
|
{[8]byte{0, 1, 0, 0}, [8]byte{1, 1, 0, 0}},
|
||||||
{[]byte{0xFF, 0xFF, 0xFF, 0xFE}, []byte{0, 0, 0, 0xFF}},
|
{[8]byte{0xFF, 0xFF, 0xFF, 0xFE}, [8]byte{0, 0, 0, 0xFF}},
|
||||||
{[]byte{0xFF, 0xFF, 0xFF, 0xFF}, []byte{0, 0, 0, 0}},
|
{[8]byte{0xFF, 0xFF, 0xFF, 0xFF}, [8]byte{0, 0, 0, 0, 1}},
|
||||||
|
{[8]byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}, [8]byte{0, 0, 0, 0, 0, 0, 0}},
|
||||||
} {
|
} {
|
||||||
increment(test.in)
|
increment(&test.in)
|
||||||
assert.Equal(t, test.want, test.in)
|
assert.Equal(t, test.want, test.in)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue