diff --git a/b2/b2.go b/b2/b2.go index cbdfc9e79..152c8fddf 100644 --- a/b2/b2.go +++ b/b2/b2.go @@ -5,6 +5,7 @@ package b2 // checking SHA1s? import ( + "bufio" "bytes" "crypto/sha1" "fmt" @@ -705,6 +706,11 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs. return fs, fs.Update(in, src, options...) } +// PutStream uploads to the remote path with the modTime given of indeterminate size +func (f *Fs) PutStream(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { + return f.Put(in, src, options...) +} + // Mkdir creates the bucket if it doesn't exist func (f *Fs) Mkdir(dir string) error { f.bucketOKMu.Lock() @@ -1237,8 +1243,33 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio } size := src.Size() - // If a large file upload in chunks - see upload.go - if size >= int64(uploadCutoff) { + if size == -1 { + // Check if the file is large enough for a chunked upload (needs to be at least two chunks) + buf := o.fs.getUploadBlock() + n, err := io.ReadFull(in, buf) + if err == nil { + bufReader := bufio.NewReader(in) + in = bufReader + _, err = bufReader.Peek(1) + } + + if err == nil { + fs.Debugf(o, "File is big enough for chunked streaming") + up, err := o.fs.newLargeUpload(o, in, src) + if err != nil { + o.fs.putUploadBlock(buf) + return err + } + return up.Stream(buf) + } else if err == io.EOF || err == io.ErrUnexpectedEOF { + fs.Debugf(o, "File has %d bytes, which makes only one chunk. Using direct upload.", n) + defer o.fs.putUploadBlock(buf) + size = int64(n) + in = bytes.NewReader(buf[:n]) + } else { + return err + } + } else if size > int64(uploadCutoff) { up, err := o.fs.newLargeUpload(o, in, src) if err != nil { return err @@ -1373,10 +1404,11 @@ func (o *Object) MimeType() string { // Check the interfaces are satisfied var ( - _ fs.Fs = &Fs{} - _ fs.Purger = &Fs{} - _ fs.CleanUpper = &Fs{} - _ fs.ListRer = &Fs{} - _ fs.Object = &Object{} - _ fs.MimeTyper = &Object{} + _ fs.Fs = &Fs{} + _ fs.Purger = &Fs{} + _ fs.PutStreamer = &Fs{} + _ fs.CleanUpper = &Fs{} + _ fs.ListRer = &Fs{} + _ fs.Object = &Object{} + _ fs.MimeTyper = &Object{} ) diff --git a/b2/upload.go b/b2/upload.go index a803991b3..dbf53731e 100644 --- a/b2/upload.go +++ b/b2/upload.go @@ -70,7 +70,7 @@ type largeUpload struct { in io.Reader // read the data from here id string // ID of the file being uploaded size int64 // total size - parts int64 // calculated number of parts + parts int64 // calculated number of parts, if known sha1s []string // slice of SHA1s for each part uploadMu sync.Mutex // lock for upload variable uploads []*api.GetUploadPartURLResponse // result of get upload URL calls @@ -80,13 +80,21 @@ type largeUpload struct { func (f *Fs) newLargeUpload(o *Object, in io.Reader, src fs.ObjectInfo) (up *largeUpload, err error) { remote := o.remote size := src.Size() - parts := size / int64(chunkSize) - if size%int64(chunkSize) != 0 { - parts++ - } - if parts > maxParts { - return nil, errors.Errorf("%q too big (%d bytes) makes too many parts %d > %d - increase --b2-chunk-size", remote, size, parts, maxParts) + parts := int64(0) + sha1SliceSize := int64(maxParts) + if size == -1 { + fs.Debugf(o, "Streaming upload with --b2-chunk-size %s allows uploads of up to %s and will fail only when that limit is reached.", fs.SizeSuffix(chunkSize), fs.SizeSuffix(maxParts*chunkSize)) + } else { + parts = size / int64(chunkSize) + if size%int64(chunkSize) != 0 { + parts++ + } + if parts > maxParts { + return nil, errors.Errorf("%q too big (%d bytes) makes too many parts %d > %d - increase --b2-chunk-size", remote, size, parts, maxParts) + } + sha1SliceSize = parts } + modTime := src.ModTime() opts := rest.Opts{ Method: "POST", @@ -123,7 +131,7 @@ func (f *Fs) newLargeUpload(o *Object, in io.Reader, src fs.ObjectInfo) (up *lar id: response.ID, size: size, parts: parts, - sha1s: make([]string, parts), + sha1s: make([]string, sha1SliceSize), } return up, nil } @@ -243,6 +251,7 @@ func (up *largeUpload) transferChunk(part int64, body []byte) error { // finish closes off the large upload func (up *largeUpload) finish() error { + fs.Debugf(up.o, "Finishing large file upload with %d parts", up.parts) opts := rest.Opts{ Method: "POST", Path: "/b2_finish_large_file", @@ -279,6 +288,101 @@ func (up *largeUpload) cancel() error { return err } +func (up *largeUpload) managedTransferChunk(wg *sync.WaitGroup, errs chan error, part int64, buf []byte) { + wg.Add(1) + go func(part int64, buf []byte) { + defer wg.Done() + defer up.f.putUploadBlock(buf) + err := up.transferChunk(part, buf) + if err != nil { + select { + case errs <- err: + default: + } + } + }(part, buf) +} + +func (up *largeUpload) finishOrCancelOnError(err error, errs chan error) error { + if err == nil { + select { + case err = <-errs: + default: + } + } + if err != nil { + fs.Debugf(up.o, "Cancelling large file upload due to error: %v", err) + cancelErr := up.cancel() + if cancelErr != nil { + fs.Errorf(up.o, "Failed to cancel large file upload: %v", cancelErr) + } + return err + } + return up.finish() +} + +// Stream uploads the chunks from the input, starting with a required initial +// chunk. Assumes the file size is unknown and will upload until the input +// reaches EOF. +func (up *largeUpload) Stream(initialUploadBlock []byte) (err error) { + fs.Debugf(up.o, "Starting streaming of large file (id %q)", up.id) + errs := make(chan error, 1) + hasMoreParts := true + var wg sync.WaitGroup + fs.AccountByPart(up.o) // Cancel whole file accounting before reading + + // Transfer initial chunk + up.size = int64(len(initialUploadBlock)) + up.managedTransferChunk(&wg, errs, 1, initialUploadBlock) + +outer: + for part := int64(2); hasMoreParts; part++ { + // Check any errors + select { + case err = <-errs: + break outer + default: + } + + // Get a block of memory + buf := up.f.getUploadBlock() + + // Read the chunk + n, err := io.ReadFull(up.in, buf) + if err == io.ErrUnexpectedEOF { + fs.Debugf(up.o, "Read less than a full chunk, making this the last one.") + buf = buf[:n] + hasMoreParts = false + err = nil + } else if err == io.EOF { + fs.Debugf(up.o, "Could not read any more bytes, previous chunk was the last.") + up.f.putUploadBlock(buf) + hasMoreParts = false + err = nil + break outer + } else if err != nil { + // other kinds of errors indicate failure + up.f.putUploadBlock(buf) + break outer + } + + // Keep stats up to date + up.parts = part + up.size += int64(n) + if part > maxParts { + err = errors.Errorf("%q too big (%d bytes so far) makes too many parts %d > %d - increase --b2-chunk-size", up.o, up.size, up.parts, maxParts) + break outer + } + + // Transfer the chunk + up.managedTransferChunk(&wg, errs, part, buf) + } + wg.Wait() + up.sha1s = up.sha1s[:up.parts] + + return up.finishOrCancelOnError(err, errs) +} + // Upload uploads the chunks from the input func (up *largeUpload) Upload() error { fs.Debugf(up.o, "Starting upload of large file in %d chunks (id %q)", up.parts, up.id) @@ -312,37 +416,10 @@ outer: } // Transfer the chunk - wg.Add(1) - go func(part int64, buf []byte) { - defer wg.Done() - defer up.f.putUploadBlock(buf) - err := up.transferChunk(part, buf) - if err != nil { - select { - case errs <- err: - default: - } - } - }(part, buf) - + up.managedTransferChunk(&wg, errs, part, buf) remaining -= reqSize } wg.Wait() - if err == nil { - select { - case err = <-errs: - default: - } - } - if err != nil { - fs.Debugf(up.o, "Cancelling large file upload due to error: %v", err) - cancelErr := up.cancel() - if cancelErr != nil { - fs.Errorf(up.o, "Failed to cancel large file upload: %v", cancelErr) - } - return err - } - // Check any errors - fs.Debugf(up.o, "Finishing large file upload") - return up.finish() + + return up.finishOrCancelOnError(err, errs) } diff --git a/fs/operations.go b/fs/operations.go index b8ff3ec83..b80a03a0a 100644 --- a/fs/operations.go +++ b/fs/operations.go @@ -121,7 +121,7 @@ func Equal(src ObjectInfo, dst Object) bool { func equal(src ObjectInfo, dst Object, sizeOnly, checkSum bool) bool { if !Config.IgnoreSize { if src.Size() != dst.Size() { - Debugf(src, "Sizes differ") + Debugf(src, "Sizes differ (src %d vs dst %d)", src.Size(), dst.Size()) return false } }