diff --git a/backend/b2/b2.go b/backend/b2/b2.go index 211ec05e4..56d8ff117 100644 --- a/backend/b2/b2.go +++ b/backend/b2/b2.go @@ -32,6 +32,7 @@ import ( "github.com/rclone/rclone/fs/walk" "github.com/rclone/rclone/lib/bucket" "github.com/rclone/rclone/lib/encoder" + "github.com/rclone/rclone/lib/multipart" "github.com/rclone/rclone/lib/pacer" "github.com/rclone/rclone/lib/pool" "github.com/rclone/rclone/lib/rest" @@ -57,9 +58,7 @@ const ( minChunkSize = 5 * fs.Mebi defaultChunkSize = 96 * fs.Mebi defaultUploadCutoff = 200 * fs.Mebi - largeFileCopyCutoff = 4 * fs.Gibi // 5E9 is the max - memoryPoolFlushTime = fs.Duration(time.Minute) // flush the cached buffers after this long - memoryPoolUseMmap = false + largeFileCopyCutoff = 4 * fs.Gibi // 5E9 is the max ) // Globals @@ -149,6 +148,18 @@ might a maximum of "--transfers" chunks in progress at once. 5,000,000 Bytes is the minimum size.`, Default: defaultChunkSize, Advanced: true, + }, { + Name: "upload_concurrency", + Help: `Concurrency for multipart uploads. + +This is the number of chunks of the same file that are uploaded +concurrently. + +Note that chunks are stored in memory and there may be up to +"--transfers" * "--b2-upload-concurrency" chunks stored at once +in memory.`, + Default: 16, + Advanced: true, }, { Name: "disable_checksum", Help: `Disable checksums for large (> upload cutoff) files. @@ -188,16 +199,16 @@ The minimum value is 1 second. The maximum value is one week.`, Advanced: true, }, { Name: "memory_pool_flush_time", - Default: memoryPoolFlushTime, + Default: fs.Duration(time.Minute), Advanced: true, - Help: `How often internal memory buffer pools will be flushed. -Uploads which requires additional buffers (f.e multipart) will use memory pool for allocations. -This option controls how often unused buffers will be removed from the pool.`, + Hide: fs.OptionHideBoth, + Help: `How often internal memory buffer pools will be flushed. (no longer used)`, }, { Name: "memory_pool_use_mmap", - Default: memoryPoolUseMmap, + Default: false, Advanced: true, - Help: `Whether to use mmap buffers in internal memory pool.`, + Hide: fs.OptionHideBoth, + Help: `Whether to use mmap buffers in internal memory pool. (no longer used)`, }, { Name: config.ConfigEncoding, Help: config.ConfigEncodingHelp, @@ -224,11 +235,10 @@ type Options struct { UploadCutoff fs.SizeSuffix `config:"upload_cutoff"` CopyCutoff fs.SizeSuffix `config:"copy_cutoff"` ChunkSize fs.SizeSuffix `config:"chunk_size"` + UploadConcurrency int `config:"upload_concurrency"` DisableCheckSum bool `config:"disable_checksum"` DownloadURL string `config:"download_url"` DownloadAuthorizationDuration fs.Duration `config:"download_auth_duration"` - MemoryPoolFlushTime fs.Duration `config:"memory_pool_flush_time"` - MemoryPoolUseMmap bool `config:"memory_pool_use_mmap"` Enc encoder.MultiEncoder `config:"encoding"` } @@ -253,7 +263,6 @@ type Fs struct { authMu sync.Mutex // lock for authorizing the account pacer *fs.Pacer // To pace and retry the API calls uploadToken *pacer.TokenDispenser // control concurrency - pool *pool.Pool // memory pool } // Object describes a b2 object @@ -458,12 +467,6 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e uploads: make(map[string][]*api.GetUploadURLResponse), pacer: fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), uploadToken: pacer.NewTokenDispenser(ci.Transfers), - pool: pool.New( - time.Duration(opt.MemoryPoolFlushTime), - int(opt.ChunkSize), - ci.Transfers, - opt.MemoryPoolUseMmap, - ), } f.setRoot(root) f.features = (&fs.Features{ @@ -597,23 +600,24 @@ func (f *Fs) clearUploadURL(bucketID string) { f.uploadMu.Unlock() } -// getBuf gets a buffer of f.opt.ChunkSize and an upload token +// getRW gets a RW buffer and an upload token // // If noBuf is set then it just gets an upload token -func (f *Fs) getBuf(noBuf bool) (buf []byte) { +func (f *Fs) getRW(noBuf bool) (rw *pool.RW) { f.uploadToken.Get() if !noBuf { - buf = f.pool.Get() + rw = multipart.NewRW() } - return buf + return rw } -// putBuf returns a buffer to the memory pool and an upload token +// putRW returns a RW buffer to the memory pool and returns an upload +// token // -// If noBuf is set then it just returns the upload token -func (f *Fs) putBuf(buf []byte, noBuf bool) { - if !noBuf { - f.pool.Put(buf) +// If buf is nil then it just returns the upload token +func (f *Fs) putRW(rw *pool.RW) { + if rw != nil { + _ = rw.Close() } f.uploadToken.Put() } @@ -1293,7 +1297,7 @@ func (f *Fs) copy(ctx context.Context, dstObj *Object, srcObj *Object, newInfo * if err != nil { return err } - return up.Upload(ctx) + return up.Copy(ctx) } dstBucket, dstPath := dstObj.split() @@ -1861,11 +1865,11 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op if err != nil { return err } - if size == -1 { + if size < 0 { // Check if the file is large enough for a chunked upload (needs to be at least two chunks) - buf := o.fs.getBuf(false) + rw := o.fs.getRW(false) - n, err := io.ReadFull(in, buf) + n, err := io.CopyN(rw, in, int64(o.fs.opt.ChunkSize)) if err == nil { bufReader := bufio.NewReader(in) in = bufReader @@ -1876,26 +1880,28 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op fs.Debugf(o, "File is big enough for chunked streaming") up, err := o.fs.newLargeUpload(ctx, o, in, src, o.fs.opt.ChunkSize, false, nil) if err != nil { - o.fs.putBuf(buf, false) + o.fs.putRW(rw) return err } // NB Stream returns the buffer and token - return up.Stream(ctx, buf) - } else if err == io.EOF || err == io.ErrUnexpectedEOF { + return up.Stream(ctx, rw) + } else if err == io.EOF { fs.Debugf(o, "File has %d bytes, which makes only one chunk. Using direct upload.", n) - defer o.fs.putBuf(buf, false) - size = int64(n) - in = bytes.NewReader(buf[:n]) + defer o.fs.putRW(rw) + size = n + in = rw } else { - o.fs.putBuf(buf, false) + o.fs.putRW(rw) return err } } else if size > int64(o.fs.opt.UploadCutoff) { - up, err := o.fs.newLargeUpload(ctx, o, in, src, o.fs.opt.ChunkSize, false, nil) - if err != nil { - return err - } - return up.Upload(ctx) + _, err := multipart.UploadMultipart(ctx, src, in, multipart.UploadMultipartOptions{ + Open: o.fs, + Concurrency: o.fs.opt.UploadConcurrency, + OpenOptions: options, + //LeavePartsOnError: o.fs.opt.LeavePartsOnError, + }) + return err } modTime := src.ModTime(ctx) @@ -2003,6 +2009,36 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op return o.decodeMetaDataFileInfo(&response) } +// OpenChunkWriter returns the chunk size and a ChunkWriter +// +// Pass in the remote and the src object +// You can also use options to hint at the desired chunk size +func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (chunkSizeResult int64, writer fs.ChunkWriter, err error) { + // FIXME what if file is smaller than 1 chunk? + if f.opt.Versions { + return -1, nil, errNotWithVersions + } + if f.opt.VersionAt.IsSet() { + return -1, nil, errNotWithVersionAt + } + //size := src.Size() + + // Temporary Object under construction + o := &Object{ + fs: f, + remote: src.Remote(), + } + + bucket, _ := o.split() + err = f.makeBucket(ctx, bucket) + if err != nil { + return -1, nil, err + } + + up, err := f.newLargeUpload(ctx, o, nil, src, f.opt.ChunkSize, false, nil) + return int64(f.opt.ChunkSize), up, err +} + // Remove an object func (o *Object) Remove(ctx context.Context) error { bucket, bucketPath := o.split() @@ -2030,14 +2066,15 @@ func (o *Object) ID() string { // Check the interfaces are satisfied var ( - _ fs.Fs = &Fs{} - _ fs.Purger = &Fs{} - _ fs.Copier = &Fs{} - _ fs.PutStreamer = &Fs{} - _ fs.CleanUpper = &Fs{} - _ fs.ListRer = &Fs{} - _ fs.PublicLinker = &Fs{} - _ fs.Object = &Object{} - _ fs.MimeTyper = &Object{} - _ fs.IDer = &Object{} + _ fs.Fs = &Fs{} + _ fs.Purger = &Fs{} + _ fs.Copier = &Fs{} + _ fs.PutStreamer = &Fs{} + _ fs.CleanUpper = &Fs{} + _ fs.ListRer = &Fs{} + _ fs.PublicLinker = &Fs{} + _ fs.OpenChunkWriter = &Fs{} + _ fs.Object = &Object{} + _ fs.MimeTyper = &Object{} + _ fs.IDer = &Object{} ) diff --git a/backend/b2/upload.go b/backend/b2/upload.go index cdf8dbef0..fc843d12b 100644 --- a/backend/b2/upload.go +++ b/backend/b2/upload.go @@ -5,7 +5,6 @@ package b2 import ( - "bytes" "context" "crypto/sha1" "encoding/hex" @@ -14,7 +13,6 @@ import ( "io" "strings" "sync" - "time" "github.com/rclone/rclone/backend/b2/api" "github.com/rclone/rclone/fs" @@ -80,7 +78,8 @@ type largeUpload struct { wrap accounting.WrapFn // account parts being transferred id string // ID of the file being uploaded size int64 // total size - parts int64 // calculated number of parts, if known + parts int // calculated number of parts, if known + sha1smu sync.Mutex // mutex to protect sha1s sha1s []string // slice of SHA1s for each part uploadMu sync.Mutex // lock for upload variable uploads []*api.GetUploadPartURLResponse // result of get upload URL calls @@ -93,18 +92,16 @@ type largeUpload struct { // If newInfo is set then metadata from that will be used instead of reading it from src func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs.ObjectInfo, defaultChunkSize fs.SizeSuffix, doCopy bool, newInfo *api.File) (up *largeUpload, err error) { size := src.Size() - parts := int64(0) - sha1SliceSize := int64(maxParts) + parts := 0 chunkSize := defaultChunkSize if size == -1 { fs.Debugf(o, "Streaming upload with --b2-chunk-size %s allows uploads of up to %s and will fail only when that limit is reached.", f.opt.ChunkSize, maxParts*f.opt.ChunkSize) } else { chunkSize = chunksize.Calculator(o, size, maxParts, defaultChunkSize) - parts = size / int64(chunkSize) + parts = int(size / int64(chunkSize)) if size%int64(chunkSize) != 0 { parts++ } - sha1SliceSize = parts } opts := rest.Opts{ @@ -152,7 +149,7 @@ func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs id: response.ID, size: size, parts: parts, - sha1s: make([]string, sha1SliceSize), + sha1s: make([]string, 0, 16), chunkSize: int64(chunkSize), } // unwrap the accounting from the input, we use wrap to put it @@ -203,10 +200,32 @@ func (up *largeUpload) returnUploadURL(upload *api.GetUploadPartURLResponse) { up.uploadMu.Unlock() } -// Transfer a chunk -func (up *largeUpload) transferChunk(ctx context.Context, part int64, body []byte) error { - err := up.f.pacer.Call(func() (bool, error) { - fs.Debugf(up.o, "Sending chunk %d length %d", part, len(body)) +// Add an sha1 to the being built up sha1s +func (up *largeUpload) addSha1(chunkNumber int, sha1 string) { + up.sha1smu.Lock() + defer up.sha1smu.Unlock() + if len(up.sha1s) < chunkNumber+1 { + up.sha1s = append(up.sha1s, make([]string, chunkNumber+1-len(up.sha1s))...) + } + up.sha1s[chunkNumber] = sha1 +} + +// WriteChunk will write chunk number with reader bytes, where chunk number >= 0 +func (up *largeUpload) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (size int64, err error) { + err = up.f.pacer.Call(func() (bool, error) { + // Discover the size by seeking to the end + size, err = reader.Seek(0, io.SeekEnd) + if err != nil { + return false, err + } + + // rewind the reader on retry and after reading size + _, err = reader.Seek(0, io.SeekStart) + if err != nil { + return false, err + } + + fs.Debugf(up.o, "Sending chunk %d length %d", chunkNumber, size) // Get upload URL upload, err := up.getUploadURL(ctx) @@ -214,8 +233,8 @@ func (up *largeUpload) transferChunk(ctx context.Context, part int64, body []byt return false, err } - in := newHashAppendingReader(bytes.NewReader(body), sha1.New()) - size := int64(len(body)) + int64(in.AdditionalLength()) + in := newHashAppendingReader(reader, sha1.New()) + size += int64(in.AdditionalLength()) // Authorization // @@ -245,7 +264,7 @@ func (up *largeUpload) transferChunk(ctx context.Context, part int64, body []byt Body: up.wrap(in), ExtraHeaders: map[string]string{ "Authorization": upload.AuthorizationToken, - "X-Bz-Part-Number": fmt.Sprintf("%d", part), + "X-Bz-Part-Number": fmt.Sprintf("%d", chunkNumber+1), sha1Header: "hex_digits_at_end", }, ContentLength: &size, @@ -256,7 +275,7 @@ func (up *largeUpload) transferChunk(ctx context.Context, part int64, body []byt resp, err := up.f.srv.CallJSON(ctx, &opts, nil, &response) retry, err := up.f.shouldRetry(ctx, resp, err) if err != nil { - fs.Debugf(up.o, "Error sending chunk %d (retry=%v): %v: %#v", part, retry, err, err) + fs.Debugf(up.o, "Error sending chunk %d (retry=%v): %v: %#v", chunkNumber, retry, err, err) } // On retryable error clear PartUploadURL if retry { @@ -264,30 +283,30 @@ func (up *largeUpload) transferChunk(ctx context.Context, part int64, body []byt upload = nil } up.returnUploadURL(upload) - up.sha1s[part-1] = in.HexSum() + up.addSha1(chunkNumber, in.HexSum()) return retry, err }) if err != nil { - fs.Debugf(up.o, "Error sending chunk %d: %v", part, err) + fs.Debugf(up.o, "Error sending chunk %d: %v", chunkNumber, err) } else { - fs.Debugf(up.o, "Done sending chunk %d", part) + fs.Debugf(up.o, "Done sending chunk %d", chunkNumber) } - return err + return size, err } // Copy a chunk -func (up *largeUpload) copyChunk(ctx context.Context, part int64, partSize int64) error { +func (up *largeUpload) copyChunk(ctx context.Context, part int, partSize int64) error { err := up.f.pacer.Call(func() (bool, error) { fs.Debugf(up.o, "Copying chunk %d length %d", part, partSize) opts := rest.Opts{ Method: "POST", Path: "/b2_copy_part", } - offset := (part - 1) * up.chunkSize // where we are in the source file + offset := int64(part) * up.chunkSize // where we are in the source file var request = api.CopyPartRequest{ SourceID: up.src.id, LargeFileID: up.id, - PartNumber: part, + PartNumber: int64(part + 1), Range: fmt.Sprintf("bytes=%d-%d", offset, offset+partSize-1), } var response api.UploadPartResponse @@ -296,7 +315,7 @@ func (up *largeUpload) copyChunk(ctx context.Context, part int64, partSize int64 if err != nil { fs.Debugf(up.o, "Error copying chunk %d (retry=%v): %v: %#v", part, retry, err, err) } - up.sha1s[part-1] = response.SHA1 + up.addSha1(part, response.SHA1) return retry, err }) if err != nil { @@ -307,8 +326,8 @@ func (up *largeUpload) copyChunk(ctx context.Context, part int64, partSize int64 return err } -// finish closes off the large upload -func (up *largeUpload) finish(ctx context.Context) error { +// Close closes off the large upload +func (up *largeUpload) Close(ctx context.Context) error { fs.Debugf(up.o, "Finishing large file %s with %d parts", up.what, up.parts) opts := rest.Opts{ Method: "POST", @@ -329,8 +348,8 @@ func (up *largeUpload) finish(ctx context.Context) error { return up.o.decodeMetaDataFileInfo(&response) } -// cancel aborts the large upload -func (up *largeUpload) cancel(ctx context.Context) error { +// Abort aborts the large upload +func (up *largeUpload) Abort(ctx context.Context) error { fs.Debugf(up.o, "Cancelling large file %s", up.what) opts := rest.Opts{ Method: "POST", @@ -355,157 +374,98 @@ func (up *largeUpload) cancel(ctx context.Context) error { // reaches EOF. // // Note that initialUploadBlock must be returned to f.putBuf() -func (up *largeUpload) Stream(ctx context.Context, initialUploadBlock []byte) (err error) { - defer atexit.OnError(&err, func() { _ = up.cancel(ctx) })() +func (up *largeUpload) Stream(ctx context.Context, initialUploadBlock *pool.RW) (err error) { + defer atexit.OnError(&err, func() { _ = up.Abort(ctx) })() fs.Debugf(up.o, "Starting streaming of large file (id %q)", up.id) var ( g, gCtx = errgroup.WithContext(ctx) hasMoreParts = true ) - up.size = int64(len(initialUploadBlock)) - g.Go(func() error { - for part := int64(1); hasMoreParts; part++ { - // Get a block of memory from the pool and token which limits concurrency. - var buf []byte - if part == 1 { - buf = initialUploadBlock - } else { - buf = up.f.getBuf(false) - } - - // Fail fast, in case an errgroup managed function returns an error - // gCtx is cancelled. There is no point in uploading all the other parts. - if gCtx.Err() != nil { - up.f.putBuf(buf, false) - return nil - } - - // Read the chunk - var n int - if part == 1 { - n = len(buf) - } else { - n, err = io.ReadFull(up.in, buf) - if err == io.ErrUnexpectedEOF { - fs.Debugf(up.o, "Read less than a full chunk, making this the last one.") - buf = buf[:n] - hasMoreParts = false - } else if err == io.EOF { - fs.Debugf(up.o, "Could not read any more bytes, previous chunk was the last.") - up.f.putBuf(buf, false) - return nil - } else if err != nil { - // other kinds of errors indicate failure - up.f.putBuf(buf, false) - return err - } - } - - // Keep stats up to date - up.parts = part - up.size += int64(n) - if part > maxParts { - up.f.putBuf(buf, false) - return fmt.Errorf("%q too big (%d bytes so far) makes too many parts %d > %d - increase --b2-chunk-size", up.o, up.size, up.parts, maxParts) - } - - part := part // for the closure - g.Go(func() (err error) { - defer up.f.putBuf(buf, false) - return up.transferChunk(gCtx, part, buf) - }) + up.size = initialUploadBlock.Size() + for part := 0; hasMoreParts; part++ { + // Get a block of memory from the pool and token which limits concurrency. + var rw *pool.RW + if part == 1 { + rw = initialUploadBlock + } else { + rw = up.f.getRW(false) } - return nil - }) + + // Fail fast, in case an errgroup managed function returns an error + // gCtx is cancelled. There is no point in uploading all the other parts. + if gCtx.Err() != nil { + up.f.putRW(rw) + break + } + + // Read the chunk + var n int64 + if part == 1 { + n = rw.Size() + } else { + n, err = io.CopyN(rw, up.in, up.chunkSize) + if err == io.EOF { + fs.Debugf(up.o, "Read less than a full chunk, making this the last one.") + hasMoreParts = false + } else if err != nil { + // other kinds of errors indicate failure + up.f.putRW(rw) + return err + } + } + + // Keep stats up to date + up.parts = part + up.size += n + if part > maxParts { + up.f.putRW(rw) + return fmt.Errorf("%q too big (%d bytes so far) makes too many parts %d > %d - increase --b2-chunk-size", up.o, up.size, up.parts, maxParts) + } + + part := part // for the closure + g.Go(func() (err error) { + defer up.f.putRW(rw) + _, err = up.WriteChunk(gCtx, part, rw) + return err + }) + } err = g.Wait() if err != nil { return err } - up.sha1s = up.sha1s[:up.parts] - return up.finish(ctx) + return up.Close(ctx) } -// Upload uploads the chunks from the input -func (up *largeUpload) Upload(ctx context.Context) (err error) { - defer atexit.OnError(&err, func() { _ = up.cancel(ctx) })() +// Copy the chunks from the source to the destination +func (up *largeUpload) Copy(ctx context.Context) (err error) { + defer atexit.OnError(&err, func() { _ = up.Abort(ctx) })() fs.Debugf(up.o, "Starting %s of large file in %d chunks (id %q)", up.what, up.parts, up.id) var ( - g, gCtx = errgroup.WithContext(ctx) - remaining = up.size - uploadPool *pool.Pool - ci = fs.GetConfig(ctx) + g, gCtx = errgroup.WithContext(ctx) + remaining = up.size ) - // If using large chunk size then make a temporary pool - if up.chunkSize <= int64(up.f.opt.ChunkSize) { - uploadPool = up.f.pool - } else { - uploadPool = pool.New( - time.Duration(up.f.opt.MemoryPoolFlushTime), - int(up.chunkSize), - ci.Transfers, - up.f.opt.MemoryPoolUseMmap, - ) - defer uploadPool.Flush() - } - // Get an upload token and a buffer - getBuf := func() (buf []byte) { - up.f.getBuf(true) - if !up.doCopy { - buf = uploadPool.Get() + g.SetLimit(up.f.opt.UploadConcurrency) + for part := 0; part <= up.parts; part++ { + // Fail fast, in case an errgroup managed function returns an error + // gCtx is cancelled. There is no point in copying all the other parts. + if gCtx.Err() != nil { + break } - return buf - } - // Put an upload token and a buffer - putBuf := func(buf []byte) { - if !up.doCopy { - uploadPool.Put(buf) + + reqSize := remaining + if reqSize >= up.chunkSize { + reqSize = up.chunkSize } - up.f.putBuf(nil, true) + + part := part // for the closure + g.Go(func() (err error) { + return up.copyChunk(gCtx, part, reqSize) + }) + remaining -= reqSize } - g.Go(func() error { - for part := int64(1); part <= up.parts; part++ { - // Get a block of memory from the pool and token which limits concurrency. - buf := getBuf() - - // Fail fast, in case an errgroup managed function returns an error - // gCtx is cancelled. There is no point in uploading all the other parts. - if gCtx.Err() != nil { - putBuf(buf) - return nil - } - - reqSize := remaining - if reqSize >= up.chunkSize { - reqSize = up.chunkSize - } - - if !up.doCopy { - // Read the chunk - buf = buf[:reqSize] - _, err = io.ReadFull(up.in, buf) - if err != nil { - putBuf(buf) - return err - } - } - - part := part // for the closure - g.Go(func() (err error) { - defer putBuf(buf) - if !up.doCopy { - err = up.transferChunk(gCtx, part, buf) - } else { - err = up.copyChunk(gCtx, part, reqSize) - } - return err - }) - remaining -= reqSize - } - return nil - }) err = g.Wait() if err != nil { return err } - return up.finish(ctx) + return up.Close(ctx) }