cache: add support for PutStream - fixes #1836

This commit is contained in:
remusb 2017-11-30 21:16:45 +02:00
parent 47450ba326
commit 4af4bbb539

45
cache/cache.go vendored
View file

@ -376,6 +376,7 @@ func NewFs(name, rpath string) (fs.Fs, error) {
DirChangeNotify: nil, DirChangeNotify: nil,
DirCacheFlush: f.DirCacheFlush, DirCacheFlush: f.DirCacheFlush,
PutUnchecked: f.PutUnchecked, PutUnchecked: f.PutUnchecked,
PutStream: f.PutStream,
CleanUp: f.CleanUp, CleanUp: f.CleanUp,
UnWrap: f.UnWrap, UnWrap: f.UnWrap,
}).Fill(f).Mask(wrappedFs) }).Fill(f).Mask(wrappedFs)
@ -689,18 +690,18 @@ func (f *Fs) cacheReader(u io.Reader, src fs.ObjectInfo, originalRead func(inn i
} }
} }
// Put in to the remote path with the modTime given of the given size type putFn func(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error)
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
fs.Debugf(f, "put data at '%s'", src.Remote())
// put in to the remote path
func (f *Fs) put(in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put putFn) (fs.Object, error) {
var err error var err error
var obj fs.Object var obj fs.Object
if f.cacheWrites { if f.cacheWrites {
f.cacheReader(in, src, func(inn io.Reader) { f.cacheReader(in, src, func(inn io.Reader) {
obj, err = f.Fs.Put(inn, src, options...) obj, err = put(inn, src, options...)
}) })
} else { } else {
obj, err = f.Fs.Put(in, src, options...) obj, err = put(in, src, options...)
} }
if err != nil { if err != nil {
@ -714,6 +715,12 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.
return cachedObj, nil return cachedObj, nil
} }
// Put in to the remote path with the modTime given of the given size
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
fs.Debugf(f, "put data at '%s'", src.Remote())
return f.put(in, src, options, f.Fs.Put)
}
// PutUnchecked uploads the object // PutUnchecked uploads the object
func (f *Fs) PutUnchecked(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { func (f *Fs) PutUnchecked(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
do := f.Fs.Features().PutUnchecked do := f.Fs.Features().PutUnchecked
@ -721,26 +728,17 @@ func (f *Fs) PutUnchecked(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOpt
return nil, errors.New("can't PutUnchecked") return nil, errors.New("can't PutUnchecked")
} }
fs.Infof(f, "put data unchecked in '%s'", src.Remote()) fs.Infof(f, "put data unchecked in '%s'", src.Remote())
return f.put(in, src, options, do)
}
var err error // PutStream uploads the object
var obj fs.Object func (f *Fs) PutStream(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
if f.cacheWrites { do := f.Fs.Features().PutStream
f.cacheReader(in, src, func(inn io.Reader) { if do == nil {
obj, err = f.Fs.Put(inn, src, options...) return nil, errors.New("can't PutStream")
})
} else {
obj, err = f.Fs.Put(in, src, options...)
} }
fs.Infof(f, "put data streaming in '%s'", src.Remote())
if err != nil { return f.put(in, src, options, do)
fs.Errorf(src, "error saving in cache: %v", err)
return nil, err
}
cachedObj := ObjectFromOriginal(f, obj).persist()
// clean cache
go f.CleanUpCache(false)
return cachedObj, nil
} }
// Copy src to this remote using server side copy operations. // Copy src to this remote using server side copy operations.
@ -962,6 +960,7 @@ var (
_ fs.Mover = (*Fs)(nil) _ fs.Mover = (*Fs)(nil)
_ fs.DirMover = (*Fs)(nil) _ fs.DirMover = (*Fs)(nil)
_ fs.PutUncheckeder = (*Fs)(nil) _ fs.PutUncheckeder = (*Fs)(nil)
_ fs.PutStreamer = (*Fs)(nil)
_ fs.CleanUpper = (*Fs)(nil) _ fs.CleanUpper = (*Fs)(nil)
_ fs.UnWrapper = (*Fs)(nil) _ fs.UnWrapper = (*Fs)(nil)
_ fs.ListRer = (*Fs)(nil) _ fs.ListRer = (*Fs)(nil)