Add options to Put, PutUnchecked and Update, add HashOption and speed up local

* Add options to Put, PutUnchecked and Update for all Fses
  * Use these to create HashOption
  * Implement this in local
  * Pass the option in fs.Copy

This has the effect that we only calculate hashes we need to in the
local Fs which speeds up transfers significantly.
This commit is contained in:
Nick Craig-Wood 2017-05-28 12:44:22 +01:00
parent 6381959850
commit 20da3e6352
17 changed files with 101 additions and 54 deletions

View file

@ -534,7 +534,7 @@ func (f *Fs) checkUpload(resp *http.Response, in io.Reader, src fs.ObjectInfo, i
// Copy the reader in to the new object which is returned
//
// The new object may have been created if an error is returned
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
remote := src.Remote()
size := src.Size()
// Temporary Object under construction
@ -1002,7 +1002,7 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
// Update the object with the contents of the io.Reader, modTime and size
//
// The new object may have been created if an error is returned
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error {
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
file := acd.File{Node: o.info}
var info *acd.File
var resp *http.Response

View file

@ -660,7 +660,7 @@ func (f *Fs) clearBucketID() {
// Copy the reader in to the new object which is returned
//
// The new object may have been created if an error is returned
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
// Temporary Object under construction
fs := &Object{
fs: f,
@ -1161,7 +1161,7 @@ func urlEncode(in string) string {
// Update the object with the contents of the io.Reader, modTime and size
//
// The new object may have been created if an error is returned
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) (err error) {
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
if *b2Versions {
return errNotWithVersions
}

View file

@ -162,7 +162,7 @@ func (f *Fs) NewObject(remote string) (fs.Object, error) {
// May create the object even if it returns an error - if so
// will return the object and the error, otherwise will return
// nil and the error
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
wrappedIn, err := f.cipher.EncryptData(in)
if err != nil {
return nil, err
@ -282,7 +282,7 @@ func (f *Fs) DirMove(src fs.Fs, srcRemote, dstRemote string) error {
//
// This will create a duplicate if we upload a new file without
// checking to see if there is one already - use Put() for that.
func (f *Fs) PutUnchecked(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
func (f *Fs) PutUnchecked(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
do := f.Fs.Features().PutUnchecked
if do == nil {
return nil, errors.New("can't PutUnchecked")
@ -455,7 +455,7 @@ func (o *Object) Open(options ...fs.OpenOption) (rc io.ReadCloser, err error) {
}
// Update in to the object with the modTime given of the given size
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error {
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
wrappedIn, err := o.f.cipher.EncryptData(in)
if err != nil {
return err

View file

@ -584,7 +584,7 @@ func (f *Fs) createFileInfo(remote string, modTime time.Time, size int64) (*Obje
// Copy the reader in to the new object which is returned
//
// The new object may have been created if an error is returned
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
exisitingObj, err := f.newObjectWithInfo(src.Remote(), nil)
switch err {
case nil:
@ -601,7 +601,7 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
//
// This will create a duplicate if we upload a new file without
// checking to see if there is one already - use Put() for that.
func (f *Fs) PutUnchecked(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
func (f *Fs) PutUnchecked(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
remote := src.Remote()
size := src.Size()
modTime := src.ModTime()
@ -1212,7 +1212,7 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
// Copy the reader into the object updating modTime and size
//
// The new object may have been created if an error is returned
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error {
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
size := src.Size()
modTime := src.ModTime()
if o.isDocument {

View file

@ -424,13 +424,13 @@ func (rc *readCloser) Close() error {
// Copy the reader in to the new object which is returned
//
// The new object may have been created if an error is returned
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
// Temporary Object under construction
o := &Object{
fs: f,
remote: src.Remote(),
}
return o, o.Update(in, src)
return o, o.Update(in, src, options...)
}
// Mkdir creates the container if it doesn't exist
@ -835,7 +835,7 @@ func (o *Object) uploadChunked(in io.Reader, commitInfo *files.CommitInfo, size
// Copy the reader into the object updating modTime and size
//
// The new object may have been created if an error is returned
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error {
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
remote := o.remotePath()
if ignoredFiles.MatchString(remote) {
fs.Logf(o, "File name disallowed - not uploading")

View file

@ -129,7 +129,7 @@ type Fs interface {
// May create the object even if it returns an error - if so
// will return the object and the error, otherwise will return
// nil and the error
Put(in io.Reader, src ObjectInfo) (Object, error)
Put(in io.Reader, src ObjectInfo, options ...OpenOption) (Object, error)
// Mkdir makes the directory (container, bucket)
//
@ -174,7 +174,7 @@ type Object interface {
Open(options ...OpenOption) (io.ReadCloser, error)
// Update in to the object with the modTime given of the given size
Update(in io.Reader, src ObjectInfo) error
Update(in io.Reader, src ObjectInfo, options ...OpenOption) error
// Removes this object
Remove() error
@ -287,7 +287,7 @@ type Features struct {
//
// May create duplicates or return errors if src already
// exists.
PutUnchecked func(in io.Reader, src ObjectInfo) (Object, error)
PutUnchecked func(in io.Reader, src ObjectInfo, options ...OpenOption) (Object, error)
// CleanUp the trash in the Fs
//
@ -466,7 +466,7 @@ type PutUncheckeder interface {
//
// May create duplicates or return errors if src already
// exists.
PutUnchecked(in io.Reader, src ObjectInfo) (Object, error)
PutUnchecked(in io.Reader, src ObjectInfo, options ...OpenOption) (Object, error)
}
// CleanUpper is an optional interfaces for Fs

View file

@ -31,7 +31,9 @@ func (o mockObject) Size() int64 { return
func (o mockObject) Storable() bool { return true }
func (o mockObject) SetModTime(time.Time) error { return errNotImpl }
func (o mockObject) Open(options ...OpenOption) (io.ReadCloser, error) { return nil, errNotImpl }
func (o mockObject) Update(in io.Reader, src ObjectInfo) error { return errNotImpl }
func (o mockObject) Update(in io.Reader, src ObjectInfo, options ...OpenOption) error {
return errNotImpl
}
func (o mockObject) Remove() error { return errNotImpl }
type mockFs struct {

View file

@ -268,6 +268,17 @@ func Copy(f Fs, dst Object, remote string, src Object) (err error) {
maxTries := Config.LowLevelRetries
tries := 0
doUpdate := dst != nil
// work out which hash to use - limit to 1 hash in common
var common HashSet
hashType := HashNone
if !Config.SizeOnly {
common = src.Fs().Hashes().Overlap(f.Hashes())
if common.Count() > 0 {
hashType = common.GetOne()
common = HashSet(hashType)
}
}
hashOption := &HashesOption{Hashes: common}
var actionTaken string
for {
// Try server side copy first - if has optional interface and
@ -285,7 +296,7 @@ func Copy(f Fs, dst Object, remote string, src Object) (err error) {
// If can't server side copy, do it manually
if err == ErrorCantCopy {
var in0 io.ReadCloser
in0, err = src.Open()
in0, err = src.Open(hashOption)
if err != nil {
err = errors.Wrap(err, "failed to open source object")
} else {
@ -297,10 +308,10 @@ func Copy(f Fs, dst Object, remote string, src Object) (err error) {
}
if doUpdate {
actionTaken = "Copied (replaced existing)"
err = dst.Update(in, wrappedSrc)
err = dst.Update(in, wrappedSrc, hashOption)
} else {
actionTaken = "Copied (new)"
dst, err = f.Put(in, wrappedSrc)
dst, err = f.Put(in, wrappedSrc, hashOption)
}
closeErr := in.Close()
if err == nil {
@ -338,12 +349,7 @@ func Copy(f Fs, dst Object, remote string, src Object) (err error) {
// Verify hashes are the same after transfer - ignoring blank hashes
// TODO(klauspost): This could be extended, so we always create a hash type matching
// the destination, and calculate it while sending.
common := src.Fs().Hashes().Overlap(dst.Fs().Hashes())
// Debugf(src, "common hashes: %v", common)
if !Config.SizeOnly && common.Count() > 0 {
// Get common hash type
hashType := common.GetOne()
if hashType != HashNone {
var srcSum string
srcSum, err = src.Hash(hashType)
if err != nil {

View file

@ -94,6 +94,27 @@ func (o *HTTPOption) Mandatory() bool {
return false
}
// HashesOption defines an option used to tell the local fs to limit
// the number of hashes it calculates.
type HashesOption struct {
Hashes HashSet
}
// Header formats the option as an http header
func (o *HashesOption) Header() (key string, value string) {
return "", ""
}
// String formats the option into human readable form
func (o *HashesOption) String() string {
return fmt.Sprintf("HashesOption(%v)", o.Hashes)
}
// Mandatory returns whether the option must be parsed or can be ignored
func (o *HashesOption) Mandatory() bool {
return false
}
// OpenOptionAddHeaders adds each header found in options to the
// headers map provided the key was non empty.
func OpenOptionAddHeaders(options []OpenOption, headers map[string]string) {

View file

@ -380,7 +380,7 @@ func (f *Fs) Precision() time.Duration {
// May create the object even if it returns an error - if so
// will return the object and the error, otherwise will return
// nil and the error
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
// fs.Debugf(f, "Trying to put file %s", src.Remote())
err := f.mkParentDir(src.Remote())
if err != nil {
@ -390,7 +390,7 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
fs: f,
remote: src.Remote(),
}
err = o.Update(in, src)
err = o.Update(in, src, options...)
return o, err
}
@ -674,7 +674,7 @@ func (o *Object) Open(options ...fs.OpenOption) (rc io.ReadCloser, err error) {
// Copy the reader into the object updating modTime and size
//
// The new object may have been created if an error is returned
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) (err error) {
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
// defer fs.Trace(o, "src=%v", src)("err=%v", &err)
path := path.Join(o.fs.root, o.remote)
// remove the file if upload failed

View file

@ -445,13 +445,13 @@ func (f *Fs) List(out fs.ListOpts, dir string) {
// Copy the reader in to the new object which is returned
//
// The new object may have been created if an error is returned
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
// Temporary Object under construction
o := &Object{
fs: f,
remote: src.Remote(),
}
return o, o.Update(in, src)
return o, o.Update(in, src, options...)
}
// Mkdir creates the bucket if it doesn't exist
@ -683,7 +683,7 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
// Update the object with the contents of the io.Reader, modTime and size
//
// The new object may have been created if an error is returned
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error {
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
size := src.Size()
modTime := src.ModTime()

View file

@ -372,11 +372,11 @@ func (m *mapper) Save(in, out string) string {
}
// Put the Object to the local filesystem
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
remote := src.Remote()
// Temporary Object under construction - info filled in by Update()
o := f.newObject(remote, "")
err := o.Update(in, src)
err := o.Update(in, src, options...)
if err != nil {
return nil, err
}
@ -707,10 +707,13 @@ func (file *localOpenFile) Close() (err error) {
// Open an object for read
func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
var offset int64
hashes := fs.SupportedHashes
for _, option := range options {
switch x := option.(type) {
case *fs.SeekOption:
offset = x.Offset
case *fs.HashesOption:
hashes = x.Hashes
default:
if option.Mandatory() {
fs.Logf(o, "Unsupported mandatory option: %v", option)
@ -728,11 +731,15 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
// don't attempt to make checksums
return fd, err
}
hash, err := fs.NewMultiHasherTypes(hashes)
if err != nil {
return nil, err
}
// Update the md5sum as we go along
in = &localOpenFile{
o: o,
in: fd,
hash: fs.NewMultiHasher(),
hash: hash,
}
return in, nil
}
@ -744,7 +751,15 @@ func (o *Object) mkdirAll() error {
}
// Update the object from in with modTime and size
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error {
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
hashes := fs.SupportedHashes
for _, option := range options {
switch x := option.(type) {
case *fs.HashesOption:
hashes = x.Hashes
}
}
err := o.mkdirAll()
if err != nil {
return err
@ -756,7 +771,10 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error {
}
// Calculate the hash of the object we are reading as we go along
hash := fs.NewMultiHasher()
hash, err := fs.NewMultiHasherTypes(hashes)
if err != nil {
return err
}
in = io.TeeReader(in, hash)
_, err = io.Copy(out, in)

View file

@ -468,7 +468,7 @@ func (f *Fs) createObject(remote string, modTime time.Time, size int64) (o *Obje
// Copy the reader in to the new object which is returned
//
// The new object may have been created if an error is returned
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
remote := src.Remote()
size := src.Size()
modTime := src.ModTime()
@ -477,7 +477,7 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
if err != nil {
return nil, err
}
return o, o.Update(in, src)
return o, o.Update(in, src, options...)
}
// Mkdir creates the container if it doesn't exist
@ -1007,7 +1007,7 @@ func (o *Object) uploadMultipart(in io.Reader, size int64) (err error) {
// Update the object with the contents of the io.Reader, modTime and size
//
// The new object may have been created if an error is returned
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) (err error) {
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
o.fs.tokenRenewer.Start()
defer o.fs.tokenRenewer.Stop()

View file

@ -624,13 +624,13 @@ func (f *Fs) List(out fs.ListOpts, dir string) {
}
// Put the Object into the bucket
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
// Temporary Object under construction
fs := &Object{
fs: f,
remote: src.Remote(),
}
return fs, fs.Update(in, src)
return fs, fs.Update(in, src, options...)
}
// Check if the bucket exists
@ -902,7 +902,7 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
}
// Update the Object from in with modTime and size
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error {
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
modTime := src.ModTime()
uploader := s3manager.NewUploader(o.fs.ses, func(u *s3manager.Uploader) {

View file

@ -290,7 +290,7 @@ func (f *Fs) List(out fs.ListOpts, dir string) {
}
// Put data from <in> into a new remote sftp file object described by <src.Remote()> and <src.ModTime()>
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
err := f.mkParentDir(src.Remote())
if err != nil {
return nil, errors.Wrap(err, "Put mkParentDir failed")
@ -300,7 +300,7 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
fs: f,
remote: src.Remote(),
}
err = o.Update(in, src)
err = o.Update(in, src, options...)
if err != nil {
return nil, err
}
@ -543,7 +543,7 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
}
// Update a remote sftp file using the data <in> and ModTime from <src>
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error {
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
file, err := o.fs.sftpClient.Create(o.path())
if err != nil {
return errors.Wrap(err, "Update Create failed")

View file

@ -399,13 +399,13 @@ func (f *Fs) List(out fs.ListOpts, dir string) {
// Copy the reader in to the new object which is returned
//
// The new object may have been created if an error is returned
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
// Temporary Object under construction
fs := &Object{
fs: f,
remote: src.Remote(),
}
return fs, fs.Update(in, src)
return fs, fs.Update(in, src, options...)
}
// Mkdir creates the container if it doesn't exist
@ -737,7 +737,7 @@ func (o *Object) updateChunks(in io.Reader, headers swift.Headers, size int64, c
// Update the object with the contents of the io.Reader, modTime and size
//
// The new object may have been created if an error is returned
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error {
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
size := src.Size()
modTime := src.ModTime()

View file

@ -397,7 +397,7 @@ func (o *Object) readMetaData() (err error) {
// Copy the reader in to the new object which is returned
//
// The new object may have been created if an error is returned
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
remote := src.Remote()
size := src.Size()
modTime := src.ModTime()
@ -409,7 +409,7 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
modTime: modTime,
}
//TODO maybe read metadata after upload to check if file uploaded successfully
return o, o.Update(in, src)
return o, o.Update(in, src, options...)
}
// Mkdir creates the container if it doesn't exist
@ -556,7 +556,7 @@ func (o *Object) remotePath() string {
// Copy the reader into the object updating modTime and size
//
// The new object may have been created if an error is returned
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error {
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
size := src.Size()
modTime := src.ModTime()