forked from TrueCloudLab/rclone
compare checksums on upload/download via FUSE
This commit is contained in:
parent
d5c0fe632f
commit
930ff266f2
4 changed files with 122 additions and 23 deletions
|
@ -22,6 +22,7 @@ import (
|
|||
// Globals
|
||||
var (
|
||||
noModTime = false
|
||||
noChecksum = false
|
||||
debugFUSE = false
|
||||
noSeek = false
|
||||
dirCacheTime = 5 * 60 * time.Second
|
||||
|
@ -47,6 +48,7 @@ func init() {
|
|||
unix.Umask(umask) // set it back to what it was
|
||||
cmd.Root.AddCommand(commandDefintion)
|
||||
commandDefintion.Flags().BoolVarP(&noModTime, "no-modtime", "", noModTime, "Don't read/write the modification time (can speed things up).")
|
||||
commandDefintion.Flags().BoolVarP(&noChecksum, "no-checksum", "", noChecksum, "Don't compare checksums on up/download.")
|
||||
commandDefintion.Flags().BoolVarP(&debugFUSE, "debug-fuse", "", debugFUSE, "Debug the FUSE internals - needs -v.")
|
||||
commandDefintion.Flags().BoolVarP(&noSeek, "no-seek", "", noSeek, "Don't allow seeking in files.")
|
||||
commandDefintion.Flags().DurationVarP(&dirCacheTime, "dir-cache-time", "", dirCacheTime, "Time to cache directory entries for.")
|
||||
|
@ -126,10 +128,6 @@ files to be visible in the mount.
|
|||
* those which need to know the size in advance won't - eg B2
|
||||
* maybe should pass in size as -1 to mean work it out
|
||||
* Or put in an an upload cache to cache the files on disk first
|
||||
|
||||
### TODO ###
|
||||
|
||||
* Check hashes on upload/download
|
||||
`,
|
||||
Run: func(command *cobra.Command, args []string) {
|
||||
cmd.CheckArgs(2, 2, command, args)
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"bazil.org/fuse"
|
||||
fusefs "bazil.org/fuse/fs"
|
||||
"github.com/ncw/rclone/fs"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
|
@ -20,6 +21,7 @@ type ReadFileHandle struct {
|
|||
o fs.Object
|
||||
readCalled bool // set if read has been called
|
||||
offset int64
|
||||
hash *fs.MultiHasher
|
||||
}
|
||||
|
||||
func newReadFileHandle(o fs.Object) (*ReadFileHandle, error) {
|
||||
|
@ -27,9 +29,19 @@ func newReadFileHandle(o fs.Object) (*ReadFileHandle, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var hash *fs.MultiHasher
|
||||
if !noChecksum {
|
||||
hash, err = fs.NewMultiHasherTypes(o.Fs().Hashes())
|
||||
if err != nil {
|
||||
fs.Errorf(o.Fs(), "newReadFileHandle hash error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
fh := &ReadFileHandle{
|
||||
o: o,
|
||||
r: fs.NewAccount(r, o).WithBuffer(), // account the transfer
|
||||
o: o,
|
||||
r: fs.NewAccount(r, o).WithBuffer(), // account the transfer
|
||||
hash: hash,
|
||||
}
|
||||
fs.Stats.Transferring(fh.o.Remote())
|
||||
return fh, nil
|
||||
|
@ -48,6 +60,7 @@ var _ fusefs.HandleReader = (*ReadFileHandle)(nil)
|
|||
// Must be called with fh.mu held
|
||||
func (fh *ReadFileHandle) seek(offset int64, reopen bool) (err error) {
|
||||
fh.r.StopBuffering() // stop the background reading first
|
||||
fh.hash = nil
|
||||
oldReader := fh.r.GetReader()
|
||||
r := oldReader
|
||||
// Can we seek it directly?
|
||||
|
@ -141,6 +154,14 @@ func (fh *ReadFileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp
|
|||
resp.Data = buf[:n]
|
||||
fh.offset = newOffset
|
||||
fs.Debugf(fh.o, "ReadFileHandle.Read OK")
|
||||
|
||||
if fh.hash != nil {
|
||||
_, err = fh.hash.Write(resp.Data)
|
||||
if err != nil {
|
||||
fs.Errorf(fh.o, "ReadFileHandle.Read HashError: %v", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@ -155,12 +176,35 @@ func (fh *ReadFileHandle) close() error {
|
|||
}
|
||||
fh.closed = true
|
||||
fs.Stats.DoneTransferring(fh.o.Remote(), true)
|
||||
|
||||
if err := fh.checkHash(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return fh.r.Close()
|
||||
}
|
||||
|
||||
// Check interface satisfied
|
||||
var _ fusefs.HandleFlusher = (*ReadFileHandle)(nil)
|
||||
|
||||
func (fh *ReadFileHandle) checkHash() error {
|
||||
if fh.hash == nil || !fh.readCalled || fh.offset < fh.o.Size() {
|
||||
return nil
|
||||
}
|
||||
|
||||
for hashType, dstSum := range fh.hash.Sums() {
|
||||
srcSum, err := fh.o.Hash(hashType)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !fs.HashEquals(dstSum, srcSum) {
|
||||
return errors.Errorf("corrupted on transfer: %v hash differ %q vs %q", hashType, dstSum, srcSum)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Flush is called each time the file or directory is closed.
|
||||
// Because there can be multiple file descriptors referring to a
|
||||
// single opened file, Flush can be called multiple times.
|
||||
|
@ -169,23 +213,11 @@ func (fh *ReadFileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) err
|
|||
defer fh.mu.Unlock()
|
||||
fs.Debugf(fh.o, "ReadFileHandle.Flush")
|
||||
|
||||
// Ignore the Flush as there is nothing we can sensibly do and
|
||||
// it seems quite common for Flush to be called from
|
||||
// different threads each of which have read some data.
|
||||
if false {
|
||||
// If Read hasn't been called then ignore the Flush - Release
|
||||
// will pick it up
|
||||
if !fh.readCalled {
|
||||
fs.Debugf(fh.o, "ReadFileHandle.Flush ignoring flush on unread handle")
|
||||
return nil
|
||||
|
||||
}
|
||||
err := fh.close()
|
||||
if err != nil {
|
||||
fs.Errorf(fh.o, "ReadFileHandle.Flush error: %v", err)
|
||||
return err
|
||||
}
|
||||
if err := fh.checkHash(); err != nil {
|
||||
fs.Errorf(fh.o, "ReadFileHandle.Flush error: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
fs.Debugf(fh.o, "ReadFileHandle.Flush OK")
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -37,6 +37,45 @@ func TestReadByByte(t *testing.T) {
|
|||
run.rm(t, "testfile")
|
||||
}
|
||||
|
||||
func TestReadChecksum(t *testing.T) {
|
||||
run.skipIfNoFUSE(t)
|
||||
|
||||
// create file big enough so we exceed any single FUSE read
|
||||
// request
|
||||
b := make([]rune, 3*128*1024)
|
||||
for i := range b {
|
||||
b[i] = 'r'
|
||||
}
|
||||
run.createFile(t, "bigfile", string(b))
|
||||
|
||||
// The hash comparison would fail in Flush, if we did not
|
||||
// ensure we read the whole file
|
||||
fd, err := os.Open(run.path("bigfile"))
|
||||
assert.NoError(t, err)
|
||||
buf := make([]byte, 10)
|
||||
_, err = io.ReadFull(fd, buf)
|
||||
assert.NoError(t, err)
|
||||
err = fd.Close()
|
||||
assert.NoError(t, err)
|
||||
|
||||
// The hash comparison would fail, because we only read parts
|
||||
// of the file
|
||||
fd, err = os.Open(run.path("bigfile"))
|
||||
assert.NoError(t, err)
|
||||
// read at start
|
||||
_, err = io.ReadFull(fd, buf)
|
||||
assert.NoError(t, err)
|
||||
// read at end
|
||||
_, err = fd.Seek(int64(len(b)-len(buf)), 0)
|
||||
assert.NoError(t, err)
|
||||
_, err = io.ReadFull(fd, buf)
|
||||
// ensure we don't compare hashes
|
||||
err = fd.Close()
|
||||
assert.NoError(t, err)
|
||||
|
||||
run.rm(t, "bigfile")
|
||||
}
|
||||
|
||||
// Test double close
|
||||
func TestReadFileDoubleClose(t *testing.T) {
|
||||
run.skipIfNoFUSE(t)
|
||||
|
|
|
@ -3,13 +3,13 @@
|
|||
package mount
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"bazil.org/fuse"
|
||||
fusefs "bazil.org/fuse/fs"
|
||||
"github.com/ncw/rclone/fs"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
|
@ -26,17 +26,29 @@ type WriteFileHandle struct {
|
|||
result chan error
|
||||
file *File
|
||||
writeCalled bool // set the first time Write() is called
|
||||
hash *fs.MultiHasher
|
||||
}
|
||||
|
||||
// Check interface satisfied
|
||||
var _ fusefs.Handle = (*WriteFileHandle)(nil)
|
||||
|
||||
func newWriteFileHandle(d *Dir, f *File, src fs.ObjectInfo) (*WriteFileHandle, error) {
|
||||
var hash *fs.MultiHasher
|
||||
if !noChecksum {
|
||||
var err error
|
||||
hash, err = fs.NewMultiHasherTypes(src.Fs().Hashes())
|
||||
if err != nil {
|
||||
fs.Errorf(src.Fs(), "newWriteFileHandle hash error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
fh := &WriteFileHandle{
|
||||
remote: src.Remote(),
|
||||
result: make(chan error, 1),
|
||||
file: f,
|
||||
hash: hash,
|
||||
}
|
||||
|
||||
fh.pipeReader, fh.pipeWriter = io.Pipe()
|
||||
r := fs.NewAccountSizeName(fh.pipeReader, 0, src.Remote()).WithBuffer() // account the transfer
|
||||
go func() {
|
||||
|
@ -71,6 +83,13 @@ func (fh *WriteFileHandle) Write(ctx context.Context, req *fuse.WriteRequest, re
|
|||
return err
|
||||
}
|
||||
fs.Debugf(fh.remote, "WriteFileHandle.Write OK (%d bytes written)", n)
|
||||
if fh.hash != nil {
|
||||
_, err = fh.hash.Write(req.Data)
|
||||
if err != nil {
|
||||
fs.Errorf(fh.remote, "WriteFileHandle.Write HashError: %v", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -95,6 +114,17 @@ func (fh *WriteFileHandle) close() error {
|
|||
if err == nil {
|
||||
err = readCloseErr
|
||||
}
|
||||
if err == nil && fh.hash != nil {
|
||||
for hashType, srcSum := range fh.hash.Sums() {
|
||||
dstSum, err := fh.o.Hash(hashType)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !fs.HashEquals(srcSum, dstSum) {
|
||||
return errors.Errorf("corrupted on transfer: %v hash differ %q vs %q", hashType, srcSum, dstSum)
|
||||
}
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue