Add options to Put, PutUnchecked and Update, add HashOption and speed up local
* Add options to Put, PutUnchecked and Update for all Fses * Use these to create HashOption * Implement this in local * Pass the option in fs.Copy This has the effect that we only calculate hashes we need to in the local Fs which speeds up transfers significantly.
This commit is contained in:
parent
6381959850
commit
20da3e6352
17 changed files with 101 additions and 54 deletions
|
@ -534,7 +534,7 @@ func (f *Fs) checkUpload(resp *http.Response, in io.Reader, src fs.ObjectInfo, i
|
||||||
// Copy the reader in to the new object which is returned
|
// Copy the reader in to the new object which is returned
|
||||||
//
|
//
|
||||||
// The new object may have been created if an error is returned
|
// The new object may have been created if an error is returned
|
||||||
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
|
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
||||||
remote := src.Remote()
|
remote := src.Remote()
|
||||||
size := src.Size()
|
size := src.Size()
|
||||||
// Temporary Object under construction
|
// Temporary Object under construction
|
||||||
|
@ -1002,7 +1002,7 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
|
||||||
// Update the object with the contents of the io.Reader, modTime and size
|
// Update the object with the contents of the io.Reader, modTime and size
|
||||||
//
|
//
|
||||||
// The new object may have been created if an error is returned
|
// The new object may have been created if an error is returned
|
||||||
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error {
|
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
||||||
file := acd.File{Node: o.info}
|
file := acd.File{Node: o.info}
|
||||||
var info *acd.File
|
var info *acd.File
|
||||||
var resp *http.Response
|
var resp *http.Response
|
||||||
|
|
4
b2/b2.go
4
b2/b2.go
|
@ -660,7 +660,7 @@ func (f *Fs) clearBucketID() {
|
||||||
// Copy the reader in to the new object which is returned
|
// Copy the reader in to the new object which is returned
|
||||||
//
|
//
|
||||||
// The new object may have been created if an error is returned
|
// The new object may have been created if an error is returned
|
||||||
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
|
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
||||||
// Temporary Object under construction
|
// Temporary Object under construction
|
||||||
fs := &Object{
|
fs := &Object{
|
||||||
fs: f,
|
fs: f,
|
||||||
|
@ -1161,7 +1161,7 @@ func urlEncode(in string) string {
|
||||||
// Update the object with the contents of the io.Reader, modTime and size
|
// Update the object with the contents of the io.Reader, modTime and size
|
||||||
//
|
//
|
||||||
// The new object may have been created if an error is returned
|
// The new object may have been created if an error is returned
|
||||||
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) (err error) {
|
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
|
||||||
if *b2Versions {
|
if *b2Versions {
|
||||||
return errNotWithVersions
|
return errNotWithVersions
|
||||||
}
|
}
|
||||||
|
|
|
@ -162,7 +162,7 @@ func (f *Fs) NewObject(remote string) (fs.Object, error) {
|
||||||
// May create the object even if it returns an error - if so
|
// May create the object even if it returns an error - if so
|
||||||
// will return the object and the error, otherwise will return
|
// will return the object and the error, otherwise will return
|
||||||
// nil and the error
|
// nil and the error
|
||||||
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
|
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
||||||
wrappedIn, err := f.cipher.EncryptData(in)
|
wrappedIn, err := f.cipher.EncryptData(in)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -282,7 +282,7 @@ func (f *Fs) DirMove(src fs.Fs, srcRemote, dstRemote string) error {
|
||||||
//
|
//
|
||||||
// This will create a duplicate if we upload a new file without
|
// This will create a duplicate if we upload a new file without
|
||||||
// checking to see if there is one already - use Put() for that.
|
// checking to see if there is one already - use Put() for that.
|
||||||
func (f *Fs) PutUnchecked(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
|
func (f *Fs) PutUnchecked(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
||||||
do := f.Fs.Features().PutUnchecked
|
do := f.Fs.Features().PutUnchecked
|
||||||
if do == nil {
|
if do == nil {
|
||||||
return nil, errors.New("can't PutUnchecked")
|
return nil, errors.New("can't PutUnchecked")
|
||||||
|
@ -455,7 +455,7 @@ func (o *Object) Open(options ...fs.OpenOption) (rc io.ReadCloser, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update in to the object with the modTime given of the given size
|
// Update in to the object with the modTime given of the given size
|
||||||
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error {
|
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
||||||
wrappedIn, err := o.f.cipher.EncryptData(in)
|
wrappedIn, err := o.f.cipher.EncryptData(in)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -584,7 +584,7 @@ func (f *Fs) createFileInfo(remote string, modTime time.Time, size int64) (*Obje
|
||||||
// Copy the reader in to the new object which is returned
|
// Copy the reader in to the new object which is returned
|
||||||
//
|
//
|
||||||
// The new object may have been created if an error is returned
|
// The new object may have been created if an error is returned
|
||||||
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
|
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
||||||
exisitingObj, err := f.newObjectWithInfo(src.Remote(), nil)
|
exisitingObj, err := f.newObjectWithInfo(src.Remote(), nil)
|
||||||
switch err {
|
switch err {
|
||||||
case nil:
|
case nil:
|
||||||
|
@ -601,7 +601,7 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
|
||||||
//
|
//
|
||||||
// This will create a duplicate if we upload a new file without
|
// This will create a duplicate if we upload a new file without
|
||||||
// checking to see if there is one already - use Put() for that.
|
// checking to see if there is one already - use Put() for that.
|
||||||
func (f *Fs) PutUnchecked(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
|
func (f *Fs) PutUnchecked(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
||||||
remote := src.Remote()
|
remote := src.Remote()
|
||||||
size := src.Size()
|
size := src.Size()
|
||||||
modTime := src.ModTime()
|
modTime := src.ModTime()
|
||||||
|
@ -1212,7 +1212,7 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
|
||||||
// Copy the reader into the object updating modTime and size
|
// Copy the reader into the object updating modTime and size
|
||||||
//
|
//
|
||||||
// The new object may have been created if an error is returned
|
// The new object may have been created if an error is returned
|
||||||
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error {
|
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
||||||
size := src.Size()
|
size := src.Size()
|
||||||
modTime := src.ModTime()
|
modTime := src.ModTime()
|
||||||
if o.isDocument {
|
if o.isDocument {
|
||||||
|
|
|
@ -424,13 +424,13 @@ func (rc *readCloser) Close() error {
|
||||||
// Copy the reader in to the new object which is returned
|
// Copy the reader in to the new object which is returned
|
||||||
//
|
//
|
||||||
// The new object may have been created if an error is returned
|
// The new object may have been created if an error is returned
|
||||||
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
|
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
||||||
// Temporary Object under construction
|
// Temporary Object under construction
|
||||||
o := &Object{
|
o := &Object{
|
||||||
fs: f,
|
fs: f,
|
||||||
remote: src.Remote(),
|
remote: src.Remote(),
|
||||||
}
|
}
|
||||||
return o, o.Update(in, src)
|
return o, o.Update(in, src, options...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mkdir creates the container if it doesn't exist
|
// Mkdir creates the container if it doesn't exist
|
||||||
|
@ -835,7 +835,7 @@ func (o *Object) uploadChunked(in io.Reader, commitInfo *files.CommitInfo, size
|
||||||
// Copy the reader into the object updating modTime and size
|
// Copy the reader into the object updating modTime and size
|
||||||
//
|
//
|
||||||
// The new object may have been created if an error is returned
|
// The new object may have been created if an error is returned
|
||||||
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error {
|
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
||||||
remote := o.remotePath()
|
remote := o.remotePath()
|
||||||
if ignoredFiles.MatchString(remote) {
|
if ignoredFiles.MatchString(remote) {
|
||||||
fs.Logf(o, "File name disallowed - not uploading")
|
fs.Logf(o, "File name disallowed - not uploading")
|
||||||
|
|
8
fs/fs.go
8
fs/fs.go
|
@ -129,7 +129,7 @@ type Fs interface {
|
||||||
// May create the object even if it returns an error - if so
|
// May create the object even if it returns an error - if so
|
||||||
// will return the object and the error, otherwise will return
|
// will return the object and the error, otherwise will return
|
||||||
// nil and the error
|
// nil and the error
|
||||||
Put(in io.Reader, src ObjectInfo) (Object, error)
|
Put(in io.Reader, src ObjectInfo, options ...OpenOption) (Object, error)
|
||||||
|
|
||||||
// Mkdir makes the directory (container, bucket)
|
// Mkdir makes the directory (container, bucket)
|
||||||
//
|
//
|
||||||
|
@ -174,7 +174,7 @@ type Object interface {
|
||||||
Open(options ...OpenOption) (io.ReadCloser, error)
|
Open(options ...OpenOption) (io.ReadCloser, error)
|
||||||
|
|
||||||
// Update in to the object with the modTime given of the given size
|
// Update in to the object with the modTime given of the given size
|
||||||
Update(in io.Reader, src ObjectInfo) error
|
Update(in io.Reader, src ObjectInfo, options ...OpenOption) error
|
||||||
|
|
||||||
// Removes this object
|
// Removes this object
|
||||||
Remove() error
|
Remove() error
|
||||||
|
@ -287,7 +287,7 @@ type Features struct {
|
||||||
//
|
//
|
||||||
// May create duplicates or return errors if src already
|
// May create duplicates or return errors if src already
|
||||||
// exists.
|
// exists.
|
||||||
PutUnchecked func(in io.Reader, src ObjectInfo) (Object, error)
|
PutUnchecked func(in io.Reader, src ObjectInfo, options ...OpenOption) (Object, error)
|
||||||
|
|
||||||
// CleanUp the trash in the Fs
|
// CleanUp the trash in the Fs
|
||||||
//
|
//
|
||||||
|
@ -466,7 +466,7 @@ type PutUncheckeder interface {
|
||||||
//
|
//
|
||||||
// May create duplicates or return errors if src already
|
// May create duplicates or return errors if src already
|
||||||
// exists.
|
// exists.
|
||||||
PutUnchecked(in io.Reader, src ObjectInfo) (Object, error)
|
PutUnchecked(in io.Reader, src ObjectInfo, options ...OpenOption) (Object, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// CleanUpper is an optional interfaces for Fs
|
// CleanUpper is an optional interfaces for Fs
|
||||||
|
|
|
@ -31,7 +31,9 @@ func (o mockObject) Size() int64 { return
|
||||||
func (o mockObject) Storable() bool { return true }
|
func (o mockObject) Storable() bool { return true }
|
||||||
func (o mockObject) SetModTime(time.Time) error { return errNotImpl }
|
func (o mockObject) SetModTime(time.Time) error { return errNotImpl }
|
||||||
func (o mockObject) Open(options ...OpenOption) (io.ReadCloser, error) { return nil, errNotImpl }
|
func (o mockObject) Open(options ...OpenOption) (io.ReadCloser, error) { return nil, errNotImpl }
|
||||||
func (o mockObject) Update(in io.Reader, src ObjectInfo) error { return errNotImpl }
|
func (o mockObject) Update(in io.Reader, src ObjectInfo, options ...OpenOption) error {
|
||||||
|
return errNotImpl
|
||||||
|
}
|
||||||
func (o mockObject) Remove() error { return errNotImpl }
|
func (o mockObject) Remove() error { return errNotImpl }
|
||||||
|
|
||||||
type mockFs struct {
|
type mockFs struct {
|
||||||
|
|
|
@ -268,6 +268,17 @@ func Copy(f Fs, dst Object, remote string, src Object) (err error) {
|
||||||
maxTries := Config.LowLevelRetries
|
maxTries := Config.LowLevelRetries
|
||||||
tries := 0
|
tries := 0
|
||||||
doUpdate := dst != nil
|
doUpdate := dst != nil
|
||||||
|
// work out which hash to use - limit to 1 hash in common
|
||||||
|
var common HashSet
|
||||||
|
hashType := HashNone
|
||||||
|
if !Config.SizeOnly {
|
||||||
|
common = src.Fs().Hashes().Overlap(f.Hashes())
|
||||||
|
if common.Count() > 0 {
|
||||||
|
hashType = common.GetOne()
|
||||||
|
common = HashSet(hashType)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
hashOption := &HashesOption{Hashes: common}
|
||||||
var actionTaken string
|
var actionTaken string
|
||||||
for {
|
for {
|
||||||
// Try server side copy first - if has optional interface and
|
// Try server side copy first - if has optional interface and
|
||||||
|
@ -285,7 +296,7 @@ func Copy(f Fs, dst Object, remote string, src Object) (err error) {
|
||||||
// If can't server side copy, do it manually
|
// If can't server side copy, do it manually
|
||||||
if err == ErrorCantCopy {
|
if err == ErrorCantCopy {
|
||||||
var in0 io.ReadCloser
|
var in0 io.ReadCloser
|
||||||
in0, err = src.Open()
|
in0, err = src.Open(hashOption)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = errors.Wrap(err, "failed to open source object")
|
err = errors.Wrap(err, "failed to open source object")
|
||||||
} else {
|
} else {
|
||||||
|
@ -297,10 +308,10 @@ func Copy(f Fs, dst Object, remote string, src Object) (err error) {
|
||||||
}
|
}
|
||||||
if doUpdate {
|
if doUpdate {
|
||||||
actionTaken = "Copied (replaced existing)"
|
actionTaken = "Copied (replaced existing)"
|
||||||
err = dst.Update(in, wrappedSrc)
|
err = dst.Update(in, wrappedSrc, hashOption)
|
||||||
} else {
|
} else {
|
||||||
actionTaken = "Copied (new)"
|
actionTaken = "Copied (new)"
|
||||||
dst, err = f.Put(in, wrappedSrc)
|
dst, err = f.Put(in, wrappedSrc, hashOption)
|
||||||
}
|
}
|
||||||
closeErr := in.Close()
|
closeErr := in.Close()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -338,12 +349,7 @@ func Copy(f Fs, dst Object, remote string, src Object) (err error) {
|
||||||
// Verify hashes are the same after transfer - ignoring blank hashes
|
// Verify hashes are the same after transfer - ignoring blank hashes
|
||||||
// TODO(klauspost): This could be extended, so we always create a hash type matching
|
// TODO(klauspost): This could be extended, so we always create a hash type matching
|
||||||
// the destination, and calculate it while sending.
|
// the destination, and calculate it while sending.
|
||||||
common := src.Fs().Hashes().Overlap(dst.Fs().Hashes())
|
if hashType != HashNone {
|
||||||
// Debugf(src, "common hashes: %v", common)
|
|
||||||
if !Config.SizeOnly && common.Count() > 0 {
|
|
||||||
// Get common hash type
|
|
||||||
hashType := common.GetOne()
|
|
||||||
|
|
||||||
var srcSum string
|
var srcSum string
|
||||||
srcSum, err = src.Hash(hashType)
|
srcSum, err = src.Hash(hashType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -94,6 +94,27 @@ func (o *HTTPOption) Mandatory() bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HashesOption defines an option used to tell the local fs to limit
|
||||||
|
// the number of hashes it calculates.
|
||||||
|
type HashesOption struct {
|
||||||
|
Hashes HashSet
|
||||||
|
}
|
||||||
|
|
||||||
|
// Header formats the option as an http header
|
||||||
|
func (o *HashesOption) Header() (key string, value string) {
|
||||||
|
return "", ""
|
||||||
|
}
|
||||||
|
|
||||||
|
// String formats the option into human readable form
|
||||||
|
func (o *HashesOption) String() string {
|
||||||
|
return fmt.Sprintf("HashesOption(%v)", o.Hashes)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mandatory returns whether the option must be parsed or can be ignored
|
||||||
|
func (o *HashesOption) Mandatory() bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// OpenOptionAddHeaders adds each header found in options to the
|
// OpenOptionAddHeaders adds each header found in options to the
|
||||||
// headers map provided the key was non empty.
|
// headers map provided the key was non empty.
|
||||||
func OpenOptionAddHeaders(options []OpenOption, headers map[string]string) {
|
func OpenOptionAddHeaders(options []OpenOption, headers map[string]string) {
|
||||||
|
|
|
@ -380,7 +380,7 @@ func (f *Fs) Precision() time.Duration {
|
||||||
// May create the object even if it returns an error - if so
|
// May create the object even if it returns an error - if so
|
||||||
// will return the object and the error, otherwise will return
|
// will return the object and the error, otherwise will return
|
||||||
// nil and the error
|
// nil and the error
|
||||||
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
|
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
||||||
// fs.Debugf(f, "Trying to put file %s", src.Remote())
|
// fs.Debugf(f, "Trying to put file %s", src.Remote())
|
||||||
err := f.mkParentDir(src.Remote())
|
err := f.mkParentDir(src.Remote())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -390,7 +390,7 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
|
||||||
fs: f,
|
fs: f,
|
||||||
remote: src.Remote(),
|
remote: src.Remote(),
|
||||||
}
|
}
|
||||||
err = o.Update(in, src)
|
err = o.Update(in, src, options...)
|
||||||
return o, err
|
return o, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -674,7 +674,7 @@ func (o *Object) Open(options ...fs.OpenOption) (rc io.ReadCloser, err error) {
|
||||||
// Copy the reader into the object updating modTime and size
|
// Copy the reader into the object updating modTime and size
|
||||||
//
|
//
|
||||||
// The new object may have been created if an error is returned
|
// The new object may have been created if an error is returned
|
||||||
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) (err error) {
|
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
|
||||||
// defer fs.Trace(o, "src=%v", src)("err=%v", &err)
|
// defer fs.Trace(o, "src=%v", src)("err=%v", &err)
|
||||||
path := path.Join(o.fs.root, o.remote)
|
path := path.Join(o.fs.root, o.remote)
|
||||||
// remove the file if upload failed
|
// remove the file if upload failed
|
||||||
|
|
|
@ -445,13 +445,13 @@ func (f *Fs) List(out fs.ListOpts, dir string) {
|
||||||
// Copy the reader in to the new object which is returned
|
// Copy the reader in to the new object which is returned
|
||||||
//
|
//
|
||||||
// The new object may have been created if an error is returned
|
// The new object may have been created if an error is returned
|
||||||
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
|
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
||||||
// Temporary Object under construction
|
// Temporary Object under construction
|
||||||
o := &Object{
|
o := &Object{
|
||||||
fs: f,
|
fs: f,
|
||||||
remote: src.Remote(),
|
remote: src.Remote(),
|
||||||
}
|
}
|
||||||
return o, o.Update(in, src)
|
return o, o.Update(in, src, options...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mkdir creates the bucket if it doesn't exist
|
// Mkdir creates the bucket if it doesn't exist
|
||||||
|
@ -683,7 +683,7 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
|
||||||
// Update the object with the contents of the io.Reader, modTime and size
|
// Update the object with the contents of the io.Reader, modTime and size
|
||||||
//
|
//
|
||||||
// The new object may have been created if an error is returned
|
// The new object may have been created if an error is returned
|
||||||
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error {
|
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
||||||
size := src.Size()
|
size := src.Size()
|
||||||
modTime := src.ModTime()
|
modTime := src.ModTime()
|
||||||
|
|
||||||
|
|
|
@ -372,11 +372,11 @@ func (m *mapper) Save(in, out string) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put the Object to the local filesystem
|
// Put the Object to the local filesystem
|
||||||
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
|
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
||||||
remote := src.Remote()
|
remote := src.Remote()
|
||||||
// Temporary Object under construction - info filled in by Update()
|
// Temporary Object under construction - info filled in by Update()
|
||||||
o := f.newObject(remote, "")
|
o := f.newObject(remote, "")
|
||||||
err := o.Update(in, src)
|
err := o.Update(in, src, options...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -707,10 +707,13 @@ func (file *localOpenFile) Close() (err error) {
|
||||||
// Open an object for read
|
// Open an object for read
|
||||||
func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
|
func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
|
||||||
var offset int64
|
var offset int64
|
||||||
|
hashes := fs.SupportedHashes
|
||||||
for _, option := range options {
|
for _, option := range options {
|
||||||
switch x := option.(type) {
|
switch x := option.(type) {
|
||||||
case *fs.SeekOption:
|
case *fs.SeekOption:
|
||||||
offset = x.Offset
|
offset = x.Offset
|
||||||
|
case *fs.HashesOption:
|
||||||
|
hashes = x.Hashes
|
||||||
default:
|
default:
|
||||||
if option.Mandatory() {
|
if option.Mandatory() {
|
||||||
fs.Logf(o, "Unsupported mandatory option: %v", option)
|
fs.Logf(o, "Unsupported mandatory option: %v", option)
|
||||||
|
@ -728,11 +731,15 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
|
||||||
// don't attempt to make checksums
|
// don't attempt to make checksums
|
||||||
return fd, err
|
return fd, err
|
||||||
}
|
}
|
||||||
|
hash, err := fs.NewMultiHasherTypes(hashes)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
// Update the md5sum as we go along
|
// Update the md5sum as we go along
|
||||||
in = &localOpenFile{
|
in = &localOpenFile{
|
||||||
o: o,
|
o: o,
|
||||||
in: fd,
|
in: fd,
|
||||||
hash: fs.NewMultiHasher(),
|
hash: hash,
|
||||||
}
|
}
|
||||||
return in, nil
|
return in, nil
|
||||||
}
|
}
|
||||||
|
@ -744,7 +751,15 @@ func (o *Object) mkdirAll() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the object from in with modTime and size
|
// Update the object from in with modTime and size
|
||||||
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error {
|
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
||||||
|
hashes := fs.SupportedHashes
|
||||||
|
for _, option := range options {
|
||||||
|
switch x := option.(type) {
|
||||||
|
case *fs.HashesOption:
|
||||||
|
hashes = x.Hashes
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
err := o.mkdirAll()
|
err := o.mkdirAll()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -756,7 +771,10 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Calculate the hash of the object we are reading as we go along
|
// Calculate the hash of the object we are reading as we go along
|
||||||
hash := fs.NewMultiHasher()
|
hash, err := fs.NewMultiHasherTypes(hashes)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
in = io.TeeReader(in, hash)
|
in = io.TeeReader(in, hash)
|
||||||
|
|
||||||
_, err = io.Copy(out, in)
|
_, err = io.Copy(out, in)
|
||||||
|
|
|
@ -468,7 +468,7 @@ func (f *Fs) createObject(remote string, modTime time.Time, size int64) (o *Obje
|
||||||
// Copy the reader in to the new object which is returned
|
// Copy the reader in to the new object which is returned
|
||||||
//
|
//
|
||||||
// The new object may have been created if an error is returned
|
// The new object may have been created if an error is returned
|
||||||
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
|
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
||||||
remote := src.Remote()
|
remote := src.Remote()
|
||||||
size := src.Size()
|
size := src.Size()
|
||||||
modTime := src.ModTime()
|
modTime := src.ModTime()
|
||||||
|
@ -477,7 +477,7 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return o, o.Update(in, src)
|
return o, o.Update(in, src, options...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mkdir creates the container if it doesn't exist
|
// Mkdir creates the container if it doesn't exist
|
||||||
|
@ -1007,7 +1007,7 @@ func (o *Object) uploadMultipart(in io.Reader, size int64) (err error) {
|
||||||
// Update the object with the contents of the io.Reader, modTime and size
|
// Update the object with the contents of the io.Reader, modTime and size
|
||||||
//
|
//
|
||||||
// The new object may have been created if an error is returned
|
// The new object may have been created if an error is returned
|
||||||
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) (err error) {
|
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
|
||||||
o.fs.tokenRenewer.Start()
|
o.fs.tokenRenewer.Start()
|
||||||
defer o.fs.tokenRenewer.Stop()
|
defer o.fs.tokenRenewer.Stop()
|
||||||
|
|
||||||
|
|
6
s3/s3.go
6
s3/s3.go
|
@ -624,13 +624,13 @@ func (f *Fs) List(out fs.ListOpts, dir string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put the Object into the bucket
|
// Put the Object into the bucket
|
||||||
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
|
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
||||||
// Temporary Object under construction
|
// Temporary Object under construction
|
||||||
fs := &Object{
|
fs := &Object{
|
||||||
fs: f,
|
fs: f,
|
||||||
remote: src.Remote(),
|
remote: src.Remote(),
|
||||||
}
|
}
|
||||||
return fs, fs.Update(in, src)
|
return fs, fs.Update(in, src, options...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if the bucket exists
|
// Check if the bucket exists
|
||||||
|
@ -902,7 +902,7 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the Object from in with modTime and size
|
// Update the Object from in with modTime and size
|
||||||
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error {
|
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
||||||
modTime := src.ModTime()
|
modTime := src.ModTime()
|
||||||
|
|
||||||
uploader := s3manager.NewUploader(o.fs.ses, func(u *s3manager.Uploader) {
|
uploader := s3manager.NewUploader(o.fs.ses, func(u *s3manager.Uploader) {
|
||||||
|
|
|
@ -290,7 +290,7 @@ func (f *Fs) List(out fs.ListOpts, dir string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put data from <in> into a new remote sftp file object described by <src.Remote()> and <src.ModTime()>
|
// Put data from <in> into a new remote sftp file object described by <src.Remote()> and <src.ModTime()>
|
||||||
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
|
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
||||||
err := f.mkParentDir(src.Remote())
|
err := f.mkParentDir(src.Remote())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "Put mkParentDir failed")
|
return nil, errors.Wrap(err, "Put mkParentDir failed")
|
||||||
|
@ -300,7 +300,7 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
|
||||||
fs: f,
|
fs: f,
|
||||||
remote: src.Remote(),
|
remote: src.Remote(),
|
||||||
}
|
}
|
||||||
err = o.Update(in, src)
|
err = o.Update(in, src, options...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -543,7 +543,7 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update a remote sftp file using the data <in> and ModTime from <src>
|
// Update a remote sftp file using the data <in> and ModTime from <src>
|
||||||
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error {
|
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
||||||
file, err := o.fs.sftpClient.Create(o.path())
|
file, err := o.fs.sftpClient.Create(o.path())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "Update Create failed")
|
return errors.Wrap(err, "Update Create failed")
|
||||||
|
|
|
@ -399,13 +399,13 @@ func (f *Fs) List(out fs.ListOpts, dir string) {
|
||||||
// Copy the reader in to the new object which is returned
|
// Copy the reader in to the new object which is returned
|
||||||
//
|
//
|
||||||
// The new object may have been created if an error is returned
|
// The new object may have been created if an error is returned
|
||||||
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
|
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
||||||
// Temporary Object under construction
|
// Temporary Object under construction
|
||||||
fs := &Object{
|
fs := &Object{
|
||||||
fs: f,
|
fs: f,
|
||||||
remote: src.Remote(),
|
remote: src.Remote(),
|
||||||
}
|
}
|
||||||
return fs, fs.Update(in, src)
|
return fs, fs.Update(in, src, options...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mkdir creates the container if it doesn't exist
|
// Mkdir creates the container if it doesn't exist
|
||||||
|
@ -737,7 +737,7 @@ func (o *Object) updateChunks(in io.Reader, headers swift.Headers, size int64, c
|
||||||
// Update the object with the contents of the io.Reader, modTime and size
|
// Update the object with the contents of the io.Reader, modTime and size
|
||||||
//
|
//
|
||||||
// The new object may have been created if an error is returned
|
// The new object may have been created if an error is returned
|
||||||
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error {
|
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
||||||
size := src.Size()
|
size := src.Size()
|
||||||
modTime := src.ModTime()
|
modTime := src.ModTime()
|
||||||
|
|
||||||
|
|
|
@ -397,7 +397,7 @@ func (o *Object) readMetaData() (err error) {
|
||||||
// Copy the reader in to the new object which is returned
|
// Copy the reader in to the new object which is returned
|
||||||
//
|
//
|
||||||
// The new object may have been created if an error is returned
|
// The new object may have been created if an error is returned
|
||||||
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
|
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
||||||
remote := src.Remote()
|
remote := src.Remote()
|
||||||
size := src.Size()
|
size := src.Size()
|
||||||
modTime := src.ModTime()
|
modTime := src.ModTime()
|
||||||
|
@ -409,7 +409,7 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) {
|
||||||
modTime: modTime,
|
modTime: modTime,
|
||||||
}
|
}
|
||||||
//TODO maybe read metadata after upload to check if file uploaded successfully
|
//TODO maybe read metadata after upload to check if file uploaded successfully
|
||||||
return o, o.Update(in, src)
|
return o, o.Update(in, src, options...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mkdir creates the container if it doesn't exist
|
// Mkdir creates the container if it doesn't exist
|
||||||
|
@ -556,7 +556,7 @@ func (o *Object) remotePath() string {
|
||||||
// Copy the reader into the object updating modTime and size
|
// Copy the reader into the object updating modTime and size
|
||||||
//
|
//
|
||||||
// The new object may have been created if an error is returned
|
// The new object may have been created if an error is returned
|
||||||
func (o *Object) Update(in io.Reader, src fs.ObjectInfo) error {
|
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
||||||
size := src.Size()
|
size := src.Size()
|
||||||
modTime := src.ModTime()
|
modTime := src.ModTime()
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue