b2: calculate missing hashes on the fly instead of spooling – fixes #1288
This commit is contained in:
parent
001431d326
commit
b510c70c1e
2 changed files with 57 additions and 41 deletions
41
b2/b2.go
41
b2/b2.go
|
@ -10,9 +10,7 @@ import (
|
|||
"fmt"
|
||||
"hash"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"regexp"
|
||||
"strconv"
|
||||
|
@ -1249,42 +1247,13 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
|
|||
}
|
||||
|
||||
modTime := src.ModTime()
|
||||
|
||||
calculatedSha1, _ := src.Hash(fs.HashSHA1)
|
||||
|
||||
// If source cannot provide the hash, copy to a temporary file
|
||||
// and calculate the hash while doing so.
|
||||
// Then we serve the temporary file.
|
||||
if calculatedSha1 == "" {
|
||||
// Open a temp file to copy the input
|
||||
fd, err := ioutil.TempFile("", "rclone-b2-")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_ = os.Remove(fd.Name()) // Delete the file - may not work on Windows
|
||||
defer func() {
|
||||
_ = fd.Close() // Ignore error may have been closed already
|
||||
_ = os.Remove(fd.Name()) // Delete the file - may have been deleted already
|
||||
}()
|
||||
|
||||
// Copy the input while calculating the sha1
|
||||
hash := sha1.New()
|
||||
teed := io.TeeReader(in, hash)
|
||||
n, err := io.Copy(fd, teed)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if n != size {
|
||||
return errors.Errorf("read %d bytes expecting %d", n, size)
|
||||
}
|
||||
calculatedSha1 = fmt.Sprintf("%x", hash.Sum(nil))
|
||||
|
||||
// Rewind the temporary file
|
||||
_, err = fd.Seek(0, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Set input to temporary file
|
||||
in = fd
|
||||
calculatedSha1 = "hex_digits_at_end"
|
||||
har := newHashAppendingReader(in, sha1.New())
|
||||
size += int64(har.AdditionalLength())
|
||||
in = har
|
||||
}
|
||||
|
||||
// Get upload URL
|
||||
|
|
57
b2/upload.go
57
b2/upload.go
|
@ -7,8 +7,11 @@ package b2
|
|||
import (
|
||||
"bytes"
|
||||
"crypto/sha1"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"hash"
|
||||
"io"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/ncw/rclone/b2/api"
|
||||
|
@ -17,6 +20,49 @@ import (
|
|||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type hashAppendingReader struct {
|
||||
h hash.Hash
|
||||
in io.Reader
|
||||
hexSum string
|
||||
hexReader io.Reader
|
||||
}
|
||||
|
||||
// Read returns bytes all bytes from the original reader, then the hex sum
|
||||
// of what was read so far, then EOF.
|
||||
func (har *hashAppendingReader) Read(b []byte) (int, error) {
|
||||
if har.hexReader == nil {
|
||||
n, err := har.in.Read(b)
|
||||
if err == io.EOF {
|
||||
har.in = nil // allow GC
|
||||
err = nil // allow reading hexSum before EOF
|
||||
|
||||
har.hexSum = hex.EncodeToString(har.h.Sum(nil))
|
||||
har.hexReader = strings.NewReader(har.hexSum)
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
return har.hexReader.Read(b)
|
||||
}
|
||||
|
||||
// AdditionalLength returns how many bytes the appended hex sum will take up.
|
||||
func (har *hashAppendingReader) AdditionalLength() int {
|
||||
return hex.EncodedLen(har.h.Size())
|
||||
}
|
||||
|
||||
// HexSum returns the hash sum as hex. It's only available after the original
|
||||
// reader has EOF'd. It's an empty string before that.
|
||||
func (har *hashAppendingReader) HexSum() string {
|
||||
return har.hexSum
|
||||
}
|
||||
|
||||
// newHashAppendingReader takes a Reader and a Hash and will append the hex sum
|
||||
// after the original reader reaches EOF. The increased size depends on the
|
||||
// given hash, which may be queried through AdditionalLength()
|
||||
func newHashAppendingReader(in io.Reader, h hash.Hash) *hashAppendingReader {
|
||||
withHash := io.TeeReader(in, h)
|
||||
return &hashAppendingReader{h: h, in: withHash}
|
||||
}
|
||||
|
||||
// largeUpload is used to control the upload of large files which need chunking
|
||||
type largeUpload struct {
|
||||
f *Fs // parent Fs
|
||||
|
@ -128,9 +174,9 @@ func (up *largeUpload) clearUploadURL() {
|
|||
|
||||
// Transfer a chunk
|
||||
func (up *largeUpload) transferChunk(part int64, body []byte) error {
|
||||
calculatedSHA1 := fmt.Sprintf("%x", sha1.Sum(body))
|
||||
up.sha1s[part-1] = calculatedSHA1
|
||||
size := int64(len(body))
|
||||
in := newHashAppendingReader(bytes.NewReader(body), sha1.New())
|
||||
size := int64(len(body)) + int64(in.AdditionalLength())
|
||||
|
||||
err := up.f.pacer.Call(func() (bool, error) {
|
||||
fs.Debugf(up.o, "Sending chunk %d length %d", part, len(body))
|
||||
|
||||
|
@ -165,11 +211,11 @@ func (up *largeUpload) transferChunk(part int64, body []byte) error {
|
|||
opts := rest.Opts{
|
||||
Method: "POST",
|
||||
RootURL: upload.UploadURL,
|
||||
Body: fs.AccountPart(up.o, bytes.NewBuffer(body)),
|
||||
Body: fs.AccountPart(up.o, in),
|
||||
ExtraHeaders: map[string]string{
|
||||
"Authorization": upload.AuthorizationToken,
|
||||
"X-Bz-Part-Number": fmt.Sprintf("%d", part),
|
||||
sha1Header: calculatedSHA1,
|
||||
sha1Header: "hex_digits_at_end",
|
||||
},
|
||||
ContentLength: &size,
|
||||
}
|
||||
|
@ -191,6 +237,7 @@ func (up *largeUpload) transferChunk(part int64, body []byte) error {
|
|||
} else {
|
||||
fs.Debugf(up.o, "Done sending chunk %d", part)
|
||||
}
|
||||
up.sha1s[part-1] = in.HexSum()
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue