From 0427177857a8d60798c9496721982a04fb0a0961 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Wed, 16 Aug 2023 16:59:39 +0100 Subject: [PATCH] azureblob: implement OpenChunkWriter and multi-thread uploads #7056 This implements the OpenChunkWriter interface for azureblob which enables multi-thread uploads. This makes the memory controls of the s3 backend inoperative; they are replaced with the global ones. --azureblob-memory-pool-flush-time --azureblob-memory-pool-use-mmap By using the buffered reader this fixes excessive memory use when uploading large files as it will share memory pages between all readers. --- backend/azureblob/azureblob.go | 552 ++++++++++--------- backend/azureblob/azureblob_internal_test.go | 19 +- 2 files changed, 298 insertions(+), 273 deletions(-) diff --git a/backend/azureblob/azureblob.go b/backend/azureblob/azureblob.go index 2491efebc..4e36c09cb 100644 --- a/backend/azureblob/azureblob.go +++ b/backend/azureblob/azureblob.go @@ -5,7 +5,6 @@ package azureblob import ( - "bytes" "context" "crypto/md5" "encoding/base64" @@ -18,6 +17,7 @@ import ( "net/url" "os" "path" + "sort" "strconv" "strings" "sync" @@ -33,7 +33,6 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service" "github.com/rclone/rclone/fs" - "github.com/rclone/rclone/fs/accounting" "github.com/rclone/rclone/fs/chunksize" "github.com/rclone/rclone/fs/config" "github.com/rclone/rclone/fs/config/configmap" @@ -46,10 +45,8 @@ import ( "github.com/rclone/rclone/lib/bucket" "github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/env" + "github.com/rclone/rclone/lib/multipart" "github.com/rclone/rclone/lib/pacer" - "github.com/rclone/rclone/lib/pool" - "github.com/rclone/rclone/lib/readers" - "golang.org/x/sync/errgroup" ) const ( @@ -70,8 +67,6 @@ const ( emulatorAccount = "devstoreaccount1" emulatorAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" emulatorBlobEndpoint = "http://127.0.0.1:10000/devstoreaccount1" - memoryPoolFlushTime = fs.Duration(time.Minute) // flush the cached buffers after this long - memoryPoolUseMmap = false ) var ( @@ -337,17 +332,16 @@ to start uploading.`, Advanced: true, }, { Name: "memory_pool_flush_time", - Default: memoryPoolFlushTime, + Default: fs.Duration(time.Minute), Advanced: true, - Help: `How often internal memory buffer pools will be flushed. - -Uploads which requires additional buffers (f.e multipart) will use memory pool for allocations. -This option controls how often unused buffers will be removed from the pool.`, + Hide: fs.OptionHideBoth, + Help: `How often internal memory buffer pools will be flushed. (no longer used)`, }, { Name: "memory_pool_use_mmap", - Default: memoryPoolUseMmap, + Default: false, Advanced: true, - Help: `Whether to use mmap buffers in internal memory pool.`, + Hide: fs.OptionHideBoth, + Help: `Whether to use mmap buffers in internal memory pool. (no longer used)`, }, { Name: config.ConfigEncoding, Help: config.ConfigEncodingHelp, @@ -432,8 +426,6 @@ type Options struct { ArchiveTierDelete bool `config:"archive_tier_delete"` UseEmulator bool `config:"use_emulator"` DisableCheckSum bool `config:"disable_checksum"` - MemoryPoolFlushTime fs.Duration `config:"memory_pool_flush_time"` - MemoryPoolUseMmap bool `config:"memory_pool_use_mmap"` Enc encoder.MultiEncoder `config:"encoding"` PublicAccess string `config:"public_access"` DirectoryMarkers bool `config:"directory_markers"` @@ -457,8 +449,6 @@ type Fs struct { cache *bucket.Cache // cache for container creation status pacer *fs.Pacer // To pace and retry the API calls uploadToken *pacer.TokenDispenser // control concurrency - pool *pool.Pool // memory pool - poolSize int64 // size of pages in memory pool publicAccess container.PublicAccessType // Container Public Access Level } @@ -671,13 +661,6 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e uploadToken: pacer.NewTokenDispenser(ci.Transfers), cache: bucket.NewCache(), cntSVCcache: make(map[string]*container.Client, 1), - pool: pool.New( - time.Duration(opt.MemoryPoolFlushTime), - int(opt.ChunkSize), - ci.Transfers, - opt.MemoryPoolUseMmap, - ), - poolSize: int64(opt.ChunkSize), } f.publicAccess = container.PublicAccessType(opt.PublicAccess) f.setRoot(root) @@ -1594,19 +1577,6 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, return f.NewObject(ctx, remote) } -func (f *Fs) getMemoryPool(size int64) *pool.Pool { - if size == int64(f.opt.ChunkSize) { - return f.pool - } - - return pool.New( - time.Duration(f.opt.MemoryPoolFlushTime), - int(size), - f.ci.Transfers, - f.opt.MemoryPoolUseMmap, - ) -} - // ------------------------------------------------------------ // Fs returns the parent Fs @@ -1982,8 +1952,8 @@ func (rs *readSeekCloser) Close() error { return nil } -// increment the slice passed in as LSB binary -func increment(xs []byte) { +// increment the array as LSB binary +func increment(xs *[8]byte) { for i, digit := range xs { newDigit := digit + 1 xs[i] = newDigit @@ -1994,22 +1964,43 @@ func increment(xs []byte) { } } -var warnStreamUpload sync.Once +// record chunk number and id for Close +type azBlock struct { + chunkNumber int + id string +} -// uploadMultipart uploads a file using multipart upload +// Implements the fs.ChunkWriter interface +type azChunkWriter struct { + chunkSize int64 + size int64 + f *Fs + ui uploadInfo + blocksMu sync.Mutex // protects the below + blocks []azBlock // list of blocks for finalize + binaryBlockID [8]byte // block counter as LSB first 8 bytes + o *Object +} + +// OpenChunkWriter returns the chunk size and a ChunkWriter // -// Write a larger blob, using CreateBlockBlob, PutBlock, and PutBlockList. -func (o *Object) uploadMultipart(ctx context.Context, in io.Reader, size int64, blb *blockblob.Client, httpHeaders *blob.HTTPHeaders) (err error) { - // Calculate correct partSize - partSize := o.fs.opt.ChunkSize - totalParts := -1 - - // make concurrency machinery - concurrency := o.fs.opt.UploadConcurrency - if concurrency < 1 { - concurrency = 1 +// Pass in the remote and the src object +// You can also use options to hint at the desired chunk size +func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (chunkSizeResult int64, writer fs.ChunkWriter, err error) { + // Temporary Object under construction + o := &Object{ + fs: f, + remote: remote, } - tokens := pacer.NewTokenDispenser(concurrency) + ui, err := o.prepareUpload(ctx, src, options) + if err != nil { + return -1, nil, fmt.Errorf("failed to prepare upload: %w", err) + } + + // Calculate correct partSize + partSize := f.opt.ChunkSize + totalParts := -1 + size := src.Size() // Note that the max size of file is 4.75 TB (100 MB X 50,000 // blocks) and this is bigger than the max uncommitted block @@ -2023,13 +2014,13 @@ func (o *Object) uploadMultipart(ctx context.Context, in io.Reader, size int64, // 195GB which seems like a not too unreasonable limit. if size == -1 { warnStreamUpload.Do(func() { - fs.Logf(o, "Streaming uploads using chunk size %v will have maximum file size of %v", - o.fs.opt.ChunkSize, partSize*fs.SizeSuffix(blockblob.MaxBlocks)) + fs.Logf(f, "Streaming uploads using chunk size %v will have maximum file size of %v", + f.opt.ChunkSize, partSize*fs.SizeSuffix(blockblob.MaxBlocks)) }) } else { - partSize = chunksize.Calculator(o, size, blockblob.MaxBlocks, o.fs.opt.ChunkSize) + partSize = chunksize.Calculator(remote, size, blockblob.MaxBlocks, f.opt.ChunkSize) if partSize > fs.SizeSuffix(blockblob.MaxStageBlockBytes) { - return fmt.Errorf("can't upload as it is too big %v - takes more than %d chunks of %v", fs.SizeSuffix(size), fs.SizeSuffix(blockblob.MaxBlocks), fs.SizeSuffix(blockblob.MaxStageBlockBytes)) + return -1, nil, fmt.Errorf("can't upload as it is too big %v - takes more than %d chunks of %v", fs.SizeSuffix(size), fs.SizeSuffix(blockblob.MaxBlocks), fs.SizeSuffix(blockblob.MaxStageBlockBytes)) } totalParts = int(fs.SizeSuffix(size) / partSize) if fs.SizeSuffix(size)%partSize != 0 { @@ -2039,173 +2030,259 @@ func (o *Object) uploadMultipart(ctx context.Context, in io.Reader, size int64, fs.Debugf(o, "Multipart upload session started for %d parts of size %v", totalParts, partSize) - // unwrap the accounting from the input, we use wrap to put it - // back on after the buffering - in, wrap := accounting.UnWrap(in) - - // FIXME it would be nice to delete uncommitted blocks - // See: https://github.com/rclone/rclone/issues/5583 - // - // However there doesn't seem to be an easy way of doing this other than - // by deleting the target. - // - // This means that a failed upload deletes the target which isn't ideal. - // - // Uploading a zero length blob and deleting it will remove the - // uncommitted blocks I think. - // - // Could check to see if a file exists already and if it - // doesn't then create a 0 length file and delete it to flush - // the uncommitted blocks. - // - // This is what azcopy does - // https://github.com/MicrosoftDocs/azure-docs/issues/36347#issuecomment-541457962 - // defer atexit.OnError(&err, func() { - // fs.Debugf(o, "Cancelling multipart upload") - // // Code goes here! - // })() - - // Upload the chunks - var ( - g, gCtx = errgroup.WithContext(ctx) - remaining = fs.SizeSuffix(size) // remaining size in file for logging only, -1 if size < 0 - position = fs.SizeSuffix(0) // position in file - memPool = o.fs.getMemoryPool(int64(partSize)) // pool to get memory from - finished = false // set when we have read EOF - blocks []string // list of blocks for finalize - binaryBlockID = make([]byte, 8) // block counter as LSB first 8 bytes - ) - for part := 0; !finished; part++ { - // Get a block of memory from the pool and a token which limits concurrency - tokens.Get() - buf := memPool.Get() - - free := func() { - memPool.Put(buf) // return the buf - tokens.Put() // return the token - } - - // Fail fast, in case an errgroup managed function returns an error - // gCtx is cancelled. There is no point in uploading all the other parts. - if gCtx.Err() != nil { - free() - break - } - - // Read the chunk - n, err := readers.ReadFill(in, buf) // this can never return 0, nil - if err == io.EOF { - if n == 0 { // end if no data - free() - break - } - finished = true - } else if err != nil { - free() - return fmt.Errorf("multipart upload failed to read source: %w", err) - } - buf = buf[:n] - - // increment the blockID and save the blocks for finalize - increment(binaryBlockID) - blockID := base64.StdEncoding.EncodeToString(binaryBlockID) - blocks = append(blocks, blockID) - - // Transfer the chunk - fs.Debugf(o, "Uploading part %d/%d offset %v/%v part size %d", part+1, totalParts, position, fs.SizeSuffix(size), len(buf)) - g.Go(func() (err error) { - defer free() - - // Upload the block, with MD5 for check - md5sum := md5.Sum(buf) - transactionalMD5 := md5sum[:] - err = o.fs.pacer.Call(func() (bool, error) { - bufferReader := bytes.NewReader(buf) - wrappedReader := wrap(bufferReader) - rs := readSeekCloser{wrappedReader, bufferReader} - options := blockblob.StageBlockOptions{ - // Specify the transactional md5 for the body, to be validated by the service. - TransactionalValidation: blob.TransferValidationTypeMD5(transactionalMD5), - } - _, err = blb.StageBlock(ctx, blockID, &rs, &options) - return o.fs.shouldRetry(ctx, err) - }) - if err != nil { - return fmt.Errorf("multipart upload failed to upload part: %w", err) - } - return nil - }) - - // ready for next block - if size >= 0 { - remaining -= partSize - } - position += partSize + chunkWriter := &azChunkWriter{ + chunkSize: int64(partSize), + size: size, + f: f, + ui: ui, + o: o, } - err = g.Wait() + fs.Debugf(o, "open chunk writer: started multipart upload") + return int64(partSize), chunkWriter, nil +} + +// WriteChunk will write chunk number with reader bytes, where chunk number >= 0 +func (w *azChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (int64, error) { + if chunkNumber < 0 { + err := fmt.Errorf("invalid chunk number provided: %v", chunkNumber) + return -1, err + } + + // Upload the block, with MD5 for check + m := md5.New() + currentChunkSize, err := io.Copy(m, reader) if err != nil { - return err + return -1, err } - - options := blockblob.CommitBlockListOptions{ - Metadata: o.getMetadata(), - Tier: parseTier(o.fs.opt.AccessTier), - HTTPHeaders: httpHeaders, + // If no data read, don't write the chunk + if currentChunkSize == 0 { + return 0, nil } + md5sum := m.Sum(nil) + transactionalMD5 := md5sum[:] - // Finalise the upload session - err = o.fs.pacer.Call(func() (bool, error) { - _, err := blb.CommitBlockList(ctx, blocks, &options) - return o.fs.shouldRetry(ctx, err) + // increment the blockID and save the blocks for finalize + increment(&w.binaryBlockID) + blockID := base64.StdEncoding.EncodeToString(w.binaryBlockID[:]) + + // Save the blockID for the commit + w.blocksMu.Lock() + w.blocks = append(w.blocks, azBlock{ + chunkNumber: chunkNumber, + id: blockID, + }) + w.blocksMu.Unlock() + + err = w.f.pacer.Call(func() (bool, error) { + // rewind the reader on retry and after reading md5 + _, err = reader.Seek(0, io.SeekStart) + if err != nil { + return false, err + } + options := blockblob.StageBlockOptions{ + // Specify the transactional md5 for the body, to be validated by the service. + TransactionalValidation: blob.TransferValidationTypeMD5(transactionalMD5), + } + _, err = w.ui.blb.StageBlock(ctx, blockID, &readSeekCloser{Reader: reader, Seeker: reader}, &options) + if err != nil { + if chunkNumber <= 8 { + return w.f.shouldRetry(ctx, err) + } + // retry all chunks once have done the first few + return true, err + } + return false, nil }) if err != nil { - return fmt.Errorf("multipart upload failed to finalize: %w", err) + return -1, fmt.Errorf("failed to upload chunk %d with %v bytes: %w", chunkNumber+1, currentChunkSize, err) } + + fs.Debugf(w.o, "multipart upload wrote chunk %d with %v bytes", chunkNumber+1, currentChunkSize) + return currentChunkSize, err +} + +// Abort the multpart upload. +// +// FIXME it would be nice to delete uncommitted blocks. +// +// See: https://github.com/rclone/rclone/issues/5583 +// +// However there doesn't seem to be an easy way of doing this other than +// by deleting the target. +// +// This means that a failed upload deletes the target which isn't ideal. +// +// Uploading a zero length blob and deleting it will remove the +// uncommitted blocks I think. +// +// Could check to see if a file exists already and if it doesn't then +// create a 0 length file and delete it to flush the uncommitted +// blocks. +// +// This is what azcopy does +// https://github.com/MicrosoftDocs/azure-docs/issues/36347#issuecomment-541457962 +func (w *azChunkWriter) Abort(ctx context.Context) error { + fs.Debugf(w.o, "multipart upload aborted (did nothing - see issue #5583)") return nil } +// Close and finalise the multipart upload +func (w *azChunkWriter) Close(ctx context.Context) (err error) { + // sort the completed parts by part number + sort.Slice(w.blocks, func(i, j int) bool { + return w.blocks[i].chunkNumber < w.blocks[j].chunkNumber + }) + + // Create a list of block IDs + blockIDs := make([]string, len(w.blocks)) + for i := range w.blocks { + blockIDs[i] = w.blocks[i].id + } + + options := blockblob.CommitBlockListOptions{ + Metadata: w.o.getMetadata(), + Tier: parseTier(w.f.opt.AccessTier), + HTTPHeaders: &w.ui.httpHeaders, + } + + // Finalise the upload session + err = w.f.pacer.Call(func() (bool, error) { + _, err := w.ui.blb.CommitBlockList(ctx, blockIDs, &options) + return w.f.shouldRetry(ctx, err) + }) + if err != nil { + return fmt.Errorf("failed to complete multipart upload: %w", err) + } + fs.Debugf(w.o, "multipart upload finished") + return err +} + +var warnStreamUpload sync.Once + +// uploadMultipart uploads a file using multipart upload +// +// Write a larger blob, using CreateBlockBlob, PutBlock, and PutBlockList. +func (o *Object) uploadMultipart(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (ui uploadInfo, err error) { + chunkWriter, err := multipart.UploadMultipart(ctx, src, in, multipart.UploadMultipartOptions{ + Open: o.fs, + Concurrency: o.fs.opt.UploadConcurrency, + OpenOptions: options, + //LeavePartsOnError: o.fs.opt.LeavePartsOnError, + }) + if err != nil { + return ui, err + } + return chunkWriter.(*azChunkWriter).ui, nil +} + // uploadSinglepart uploads a short blob using a single part upload -func (o *Object) uploadSinglepart(ctx context.Context, in io.Reader, size int64, blb *blockblob.Client, httpHeaders *blob.HTTPHeaders) (err error) { +func (o *Object) uploadSinglepart(ctx context.Context, in io.Reader, size int64, ui uploadInfo) (err error) { + chunkSize := int64(o.fs.opt.ChunkSize) // fs.Debugf(o, "Single part upload starting of object %d bytes", size) - if size > o.fs.poolSize || size < 0 { - return fmt.Errorf("internal error: single part upload size too big %d > %d", size, o.fs.opt.ChunkSize) + if size > chunkSize || size < 0 { + return fmt.Errorf("internal error: single part upload size too big %d > %d", size, chunkSize) } - buf := o.fs.pool.Get() - defer o.fs.pool.Put(buf) + rw := multipart.NewRW() + defer fs.CheckClose(rw, &err) - n, err := readers.ReadFill(in, buf) - if err == nil { - // Check to see whether in is exactly len(buf) or bigger - var buf2 = []byte{0} - n2, err2 := readers.ReadFill(in, buf2) - if n2 != 0 || err2 != io.EOF { - return fmt.Errorf("single part upload read failed: object longer than expected (expecting %d but got > %d)", size, len(buf)) - } - } + n, err := io.CopyN(rw, in, size+1) if err != nil && err != io.EOF { return fmt.Errorf("single part upload read failed: %w", err) } - if int64(n) != size { + if n != size { return fmt.Errorf("single part upload: expecting to read %d bytes but read %d", size, n) } - b := bytes.NewReader(buf[:n]) - rs := &readSeekCloser{Reader: b, Seeker: b} + rs := &readSeekCloser{Reader: rw, Seeker: rw} options := blockblob.UploadOptions{ Metadata: o.getMetadata(), Tier: parseTier(o.fs.opt.AccessTier), - HTTPHeaders: httpHeaders, + HTTPHeaders: &ui.httpHeaders, } - // Don't retry, return a retry error instead - return o.fs.pacer.CallNoRetry(func() (bool, error) { - _, err = blb.Upload(ctx, rs, &options) + return o.fs.pacer.Call(func() (bool, error) { + // rewind the reader on retry + _, err = rs.Seek(0, io.SeekStart) + if err != nil { + return false, err + } + _, err = ui.blb.Upload(ctx, rs, &options) return o.fs.shouldRetry(ctx, err) }) } +// Info needed for an upload +type uploadInfo struct { + blb *blockblob.Client + httpHeaders blob.HTTPHeaders + isDirMarker bool +} + +// Prepare the object for upload +func (o *Object) prepareUpload(ctx context.Context, src fs.ObjectInfo, options []fs.OpenOption) (ui uploadInfo, err error) { + container, containerPath := o.split() + if container == "" || containerPath == "" { + return ui, fmt.Errorf("can't upload to root - need a container") + } + // Create parent dir/bucket if not saving directory marker + _, isDirMarker := o.meta[dirMetaKey] + if !isDirMarker { + err = o.fs.mkdirParent(ctx, o.remote) + if err != nil { + return ui, err + } + } + + // Update Mod time + o.updateMetadataWithModTime(src.ModTime(ctx)) + if err != nil { + return ui, err + } + + // Create the HTTP headers for the upload + ui.httpHeaders = blob.HTTPHeaders{ + BlobContentType: pString(fs.MimeType(ctx, src)), + } + + // Compute the Content-MD5 of the file. As we stream all uploads it + // will be set in PutBlockList API call using the 'x-ms-blob-content-md5' header + if !o.fs.opt.DisableCheckSum { + if sourceMD5, _ := src.Hash(ctx, hash.MD5); sourceMD5 != "" { + sourceMD5bytes, err := hex.DecodeString(sourceMD5) + if err == nil { + ui.httpHeaders.BlobContentMD5 = sourceMD5bytes + } else { + fs.Debugf(o, "Failed to decode %q as MD5: %v", sourceMD5, err) + } + } + } + + // Apply upload options (also allows one to overwrite content-type) + for _, option := range options { + key, value := option.Header() + lowerKey := strings.ToLower(key) + switch lowerKey { + case "": + // ignore + case "cache-control": + ui.httpHeaders.BlobCacheControl = pString(value) + case "content-disposition": + ui.httpHeaders.BlobContentDisposition = pString(value) + case "content-encoding": + ui.httpHeaders.BlobContentEncoding = pString(value) + case "content-language": + ui.httpHeaders.BlobContentLanguage = pString(value) + case "content-type": + ui.httpHeaders.BlobContentType = pString(value) + } + } + + ui.blb = o.fs.getBlockBlobSVC(container, containerPath) + return ui, nil +} + // Update the object with the contents of the io.Reader, modTime and size // // The new object may have been created if an error is returned @@ -2221,80 +2298,26 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op return errCantUpdateArchiveTierBlobs } } - container, containerPath := o.split() - if container == "" || containerPath == "" { - return fmt.Errorf("can't upload to root - need a container") - } - // Create parent dir/bucket if not saving directory marker - _, isDirMarker := o.meta[dirMetaKey] - if !isDirMarker { - err = o.fs.mkdirParent(ctx, o.remote) - if err != nil { - return err - } - } - // Update Mod time - fs.Debugf(nil, "o.meta = %+v", o.meta) - o.updateMetadataWithModTime(src.ModTime(ctx)) - if err != nil { - return err - } - - // Create the HTTP headers for the upload - httpHeaders := blob.HTTPHeaders{ - BlobContentType: pString(fs.MimeType(ctx, src)), - } - - // Compute the Content-MD5 of the file. As we stream all uploads it - // will be set in PutBlockList API call using the 'x-ms-blob-content-md5' header - if !o.fs.opt.DisableCheckSum { - if sourceMD5, _ := src.Hash(ctx, hash.MD5); sourceMD5 != "" { - sourceMD5bytes, err := hex.DecodeString(sourceMD5) - if err == nil { - httpHeaders.BlobContentMD5 = sourceMD5bytes - } else { - fs.Debugf(o, "Failed to decode %q as MD5: %v", sourceMD5, err) - } - } - } - - // Apply upload options (also allows one to overwrite content-type) - for _, option := range options { - key, value := option.Header() - lowerKey := strings.ToLower(key) - switch lowerKey { - case "": - // ignore - case "cache-control": - httpHeaders.BlobCacheControl = pString(value) - case "content-disposition": - httpHeaders.BlobContentDisposition = pString(value) - case "content-encoding": - httpHeaders.BlobContentEncoding = pString(value) - case "content-language": - httpHeaders.BlobContentLanguage = pString(value) - case "content-type": - httpHeaders.BlobContentType = pString(value) - } - } - - blb := o.fs.getBlockBlobSVC(container, containerPath) size := src.Size() - multipartUpload := size < 0 || size > o.fs.poolSize + multipartUpload := size < 0 || size > int64(o.fs.opt.ChunkSize) + var ui uploadInfo - fs.Debugf(nil, "o.meta = %+v", o.meta) if multipartUpload { - err = o.uploadMultipart(ctx, in, size, blb, &httpHeaders) + ui, err = o.uploadMultipart(ctx, in, src, options...) } else { - err = o.uploadSinglepart(ctx, in, size, blb, &httpHeaders) + ui, err = o.prepareUpload(ctx, src, options) + if err != nil { + return fmt.Errorf("failed to prepare upload: %w", err) + } + err = o.uploadSinglepart(ctx, in, size, ui) } if err != nil { return err } // Refresh metadata on object - if !isDirMarker { + if !ui.isDirMarker { o.clearMetaData() err = o.readMetaData(ctx) if err != nil { @@ -2383,13 +2406,14 @@ func parseTier(tier string) *blob.AccessTier { // Check the interfaces are satisfied var ( - _ fs.Fs = &Fs{} - _ fs.Copier = &Fs{} - _ fs.PutStreamer = &Fs{} - _ fs.Purger = &Fs{} - _ fs.ListRer = &Fs{} - _ fs.Object = &Object{} - _ fs.MimeTyper = &Object{} - _ fs.GetTierer = &Object{} - _ fs.SetTierer = &Object{} + _ fs.Fs = &Fs{} + _ fs.Copier = &Fs{} + _ fs.PutStreamer = &Fs{} + _ fs.Purger = &Fs{} + _ fs.ListRer = &Fs{} + _ fs.OpenChunkWriter = &Fs{} + _ fs.Object = &Object{} + _ fs.MimeTyper = &Object{} + _ fs.GetTierer = &Object{} + _ fs.SetTierer = &Object{} ) diff --git a/backend/azureblob/azureblob_internal_test.go b/backend/azureblob/azureblob_internal_test.go index 827653a80..1871fa265 100644 --- a/backend/azureblob/azureblob_internal_test.go +++ b/backend/azureblob/azureblob_internal_test.go @@ -20,17 +20,18 @@ func (f *Fs) InternalTest(t *testing.T) { func TestIncrement(t *testing.T) { for _, test := range []struct { - in []byte - want []byte + in [8]byte + want [8]byte }{ - {[]byte{0, 0, 0, 0}, []byte{1, 0, 0, 0}}, - {[]byte{0xFE, 0, 0, 0}, []byte{0xFF, 0, 0, 0}}, - {[]byte{0xFF, 0, 0, 0}, []byte{0, 1, 0, 0}}, - {[]byte{0, 1, 0, 0}, []byte{1, 1, 0, 0}}, - {[]byte{0xFF, 0xFF, 0xFF, 0xFE}, []byte{0, 0, 0, 0xFF}}, - {[]byte{0xFF, 0xFF, 0xFF, 0xFF}, []byte{0, 0, 0, 0}}, + {[8]byte{0, 0, 0, 0}, [8]byte{1, 0, 0, 0}}, + {[8]byte{0xFE, 0, 0, 0}, [8]byte{0xFF, 0, 0, 0}}, + {[8]byte{0xFF, 0, 0, 0}, [8]byte{0, 1, 0, 0}}, + {[8]byte{0, 1, 0, 0}, [8]byte{1, 1, 0, 0}}, + {[8]byte{0xFF, 0xFF, 0xFF, 0xFE}, [8]byte{0, 0, 0, 0xFF}}, + {[8]byte{0xFF, 0xFF, 0xFF, 0xFF}, [8]byte{0, 0, 0, 0, 1}}, + {[8]byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}, [8]byte{0, 0, 0, 0, 0, 0, 0}}, } { - increment(test.in) + increment(&test.in) assert.Equal(t, test.want, test.in) } }