compress: correctly handle wrapping of remotes without PutStream

Also fixes ObjectInfo wrapping for Hash and Size - fixes #4928
This commit is contained in:
buengese 2021-01-17 02:04:26 +01:00
parent 80e63af470
commit d8984cd37f

View file

@ -12,6 +12,8 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"os"
"regexp" "regexp"
"strings" "strings"
"time" "time"
@ -27,6 +29,7 @@ import (
"github.com/rclone/rclone/fs/config/configstruct" "github.com/rclone/rclone/fs/config/configstruct"
"github.com/rclone/rclone/fs/fspath" "github.com/rclone/rclone/fs/fspath"
"github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/object"
"github.com/rclone/rclone/fs/operations" "github.com/rclone/rclone/fs/operations"
) )
@ -89,6 +92,16 @@ func init() {
Level 0 turns off compression.`, Level 0 turns off compression.`,
Default: sgzip.DefaultCompression, Default: sgzip.DefaultCompression,
Advanced: true, Advanced: true,
}, {
Name: "ram_cache_limit",
Help: `Some remotes don't allow the upload of files with unknown size.
In this case the compressed file will need to be cached to determine
it's size.
Files smaller than this limit will be cached in RAM, file larger than
this limit will be cached on disk`,
Default: fs.SizeSuffix(20 * 1024 * 1024),
Advanced: true,
}}, }},
}) })
} }
@ -98,6 +111,7 @@ type Options struct {
Remote string `config:"remote"` Remote string `config:"remote"`
CompressionMode string `config:"mode"` CompressionMode string `config:"mode"`
CompressionLevel int `config:"level"` CompressionLevel int `config:"level"`
RAMCacheLimit fs.SizeSuffix `config:"ram_cache_limit"`
} }
/*** FILESYSTEM FUNCTIONS ***/ /*** FILESYSTEM FUNCTIONS ***/
@ -416,8 +430,55 @@ type compressionResult struct {
meta sgzip.GzipMetadata meta sgzip.GzipMetadata
} }
// replicating some of operations.Rcat functionality because we want to support remotes without streaming
// support and of course cannot know the size of a compressed file before compressing it.
func (f *Fs) rcat(ctx context.Context, dstFileName string, in io.ReadCloser, modTime time.Time, options []fs.OpenOption) (o fs.Object, err error) {
// cache small files in memory and do normal upload
buf := make([]byte, f.opt.RAMCacheLimit)
if n, err := io.ReadFull(in, buf); err == io.EOF || err == io.ErrUnexpectedEOF {
src := object.NewStaticObjectInfo(dstFileName, modTime, int64(len(buf[:n])), false, nil, f.Fs)
return f.Fs.Put(ctx, bytes.NewBuffer(buf[:n]), src, options...)
}
// Need to include what we allready read
in = &ReadCloserWrapper{
Reader: io.MultiReader(bytes.NewReader(buf), in),
Closer: in,
}
canStream := f.Fs.Features().PutStream != nil
if canStream {
src := object.NewStaticObjectInfo(dstFileName, modTime, -1, false, nil, f.Fs)
return f.Fs.Features().PutStream(ctx, in, src, options...)
}
fs.Debugf(f, "Target remote doesn't support streaming uploads, creating temporary local file")
tempFile, err := ioutil.TempFile("", "rclone-press-")
defer func() {
// these errors should be relatively uncritical and the upload should've succeeded so it's okay-ish
// to ignore them
_ = tempFile.Close()
_ = os.Remove(tempFile.Name())
}()
if err != nil {
return nil, errors.Wrap(err, "Failed to create temporary local FS to spool file")
}
if _, err = io.Copy(tempFile, in); err != nil {
return nil, errors.Wrap(err, "Failed to write temporary local file")
}
if _, err = tempFile.Seek(0, 0); err != nil {
return nil, err
}
finfo, err := tempFile.Stat()
if err != nil {
return nil, err
}
return f.Fs.Put(ctx, tempFile, object.NewStaticObjectInfo(dstFileName, modTime, finfo.Size(), false, nil, f.Fs))
}
// Put a compressed version of a file. Returns a wrappable object and metadata. // Put a compressed version of a file. Returns a wrappable object and metadata.
func (f *Fs) putCompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put putFn, mimeType string) (fs.Object, *ObjectMetadata, error) { func (f *Fs) putCompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, mimeType string) (fs.Object, *ObjectMetadata, error) {
// Unwrap reader accounting // Unwrap reader accounting
in, wrap := accounting.UnWrap(in) in, wrap := accounting.UnWrap(in)
@ -471,7 +532,7 @@ func (f *Fs) putCompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, o
} }
// Transfer the data // Transfer the data
o, err := put(ctx, wrappedIn, f.wrapInfo(src, makeDataName(src.Remote(), src.Size(), f.mode), src.Size()), options...) o, err := f.rcat(ctx, makeDataName(src.Remote(), src.Size(), f.mode), ioutil.NopCloser(wrappedIn), src.ModTime(ctx), options)
//o, err := operations.Rcat(ctx, f.Fs, makeDataName(src.Remote(), src.Size(), f.mode), ioutil.NopCloser(wrappedIn), src.ModTime(ctx)) //o, err := operations.Rcat(ctx, f.Fs, makeDataName(src.Remote(), src.Size(), f.mode), ioutil.NopCloser(wrappedIn), src.ModTime(ctx))
if err != nil { if err != nil {
if o != nil { if o != nil {
@ -510,7 +571,7 @@ func (f *Fs) putCompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, o
} }
// Put an uncompressed version of a file. Returns a wrappable object and metadata. // Put an uncompressed version of a file. Returns a wrappable object and metadata.
func (f *Fs) putUncompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put putFn, mimeType string) (fs.Object, *ObjectMetadata, error) { func (f *Fs) putUncompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, put putFn, options []fs.OpenOption, mimeType string) (fs.Object, *ObjectMetadata, error) {
// Unwrap the accounting, add our metadata hasher, then wrap it back on // Unwrap the accounting, add our metadata hasher, then wrap it back on
in, wrap := accounting.UnWrap(in) in, wrap := accounting.UnWrap(in)
@ -577,6 +638,8 @@ func (f *Fs) putMetadata(ctx context.Context, meta *ObjectMetadata, src fs.Objec
// This function will put both the data and metadata for an Object. // This function will put both the data and metadata for an Object.
// putData is the function used for data, while putMeta is the function used for metadata. // putData is the function used for data, while putMeta is the function used for metadata.
// The putData function will only be used when the object is not compressible if the
// data is compressible this parameter will be ignored.
func (f *Fs) putWithCustomFunctions(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, func (f *Fs) putWithCustomFunctions(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption,
putData putFn, putMeta putFn, compressible bool, mimeType string) (*Object, error) { putData putFn, putMeta putFn, compressible bool, mimeType string) (*Object, error) {
// Put file then metadata // Put file then metadata
@ -584,9 +647,9 @@ func (f *Fs) putWithCustomFunctions(ctx context.Context, in io.Reader, src fs.Ob
var meta *ObjectMetadata var meta *ObjectMetadata
var err error var err error
if compressible { if compressible {
dataObject, meta, err = f.putCompress(ctx, in, src, options, putData, mimeType) dataObject, meta, err = f.putCompress(ctx, in, src, options, mimeType)
} else { } else {
dataObject, meta, err = f.putUncompress(ctx, in, src, options, putData, mimeType) dataObject, meta, err = f.putUncompress(ctx, in, src, putData, options, mimeType)
} }
if err != nil { if err != nil {
return nil, err return nil, err
@ -837,7 +900,7 @@ func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string
func (f *Fs) CleanUp(ctx context.Context) error { func (f *Fs) CleanUp(ctx context.Context) error {
do := f.Fs.Features().CleanUp do := f.Fs.Features().CleanUp
if do == nil { if do == nil {
return errors.New("can't CleanUp") return errors.New("can't CleanUp: not supported by underlying remote")
} }
return do(ctx) return do(ctx)
} }
@ -846,7 +909,7 @@ func (f *Fs) CleanUp(ctx context.Context) error {
func (f *Fs) About(ctx context.Context) (*fs.Usage, error) { func (f *Fs) About(ctx context.Context) (*fs.Usage, error) {
do := f.Fs.Features().About do := f.Fs.Features().About
if do == nil { if do == nil {
return nil, errors.New("About not supported") return nil, errors.New("can't About: not supported by underlying remote")
} }
return do(ctx) return do(ctx)
} }
@ -922,7 +985,7 @@ func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryT
func (f *Fs) PublicLink(ctx context.Context, remote string, duration fs.Duration, unlink bool) (string, error) { func (f *Fs) PublicLink(ctx context.Context, remote string, duration fs.Duration, unlink bool) (string, error) {
do := f.Fs.Features().PublicLink do := f.Fs.Features().PublicLink
if do == nil { if do == nil {
return "", errors.New("PublicLink not supported") return "", errors.New("can't PublicLink: not supported by underlying remote")
} }
o, err := f.NewObject(ctx, remote) o, err := f.NewObject(ctx, remote)
if err != nil { if err != nil {
@ -1033,7 +1096,8 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
} }
} }
} else { } else {
// Function that updates object // We can only support update when BOTH the old and the new object are uncompressed because only then
// the filesize will be known beforehand and name will stay the same
update := func(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { update := func(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
return o.Object, o.Object.Update(ctx, in, src, options...) return o.Object, o.Object.Update(ctx, in, src, options...)
} }
@ -1121,7 +1185,7 @@ func (o *Object) String() string {
func (o *Object) Remote() string { func (o *Object) Remote() string {
origFileName, _, _, err := processFileName(o.Object.Remote()) origFileName, _, _, err := processFileName(o.Object.Remote())
if err != nil { if err != nil {
fs.Errorf(o, "Could not get remote path for: %s", o.Object.Remote()) fs.Errorf(o.f, "Could not get remote path for: %s", o.Object.Remote())
return o.Object.Remote() return o.Object.Remote()
} }
return origFileName return origFileName
@ -1161,15 +1225,19 @@ func (o *Object) Hash(ctx context.Context, ht hash.Type) (string, error) {
// multiple storage classes supported // multiple storage classes supported
func (o *Object) SetTier(tier string) error { func (o *Object) SetTier(tier string) error {
do, ok := o.Object.(fs.SetTierer) do, ok := o.Object.(fs.SetTierer)
mdo, ok := o.mo.(fs.SetTierer)
if !ok { if !ok {
return errors.New("press: underlying remote does not support SetTier") return errors.New("press: underlying remote does not support SetTier")
} }
if err := mdo.SetTier(tier); err != nil {
return err
}
return do.SetTier(tier) return do.SetTier(tier)
} }
// GetTier returns storage tier or class of the Object // GetTier returns storage tier or class of the Object
func (o *Object) GetTier() string { func (o *Object) GetTier() string {
do, ok := o.Object.(fs.GetTierer) do, ok := o.mo.(fs.GetTierer)
if !ok { if !ok {
return "" return ""
} }
@ -1272,10 +1340,7 @@ func (o *ObjectInfo) Remote() string {
// Size returns the size of the file // Size returns the size of the file
func (o *ObjectInfo) Size() int64 { func (o *ObjectInfo) Size() int64 {
if o.size != -1 {
return o.size return o.size
}
return o.src.Size()
} }
// ModTime returns the modification time // ModTime returns the modification time
@ -1286,14 +1351,7 @@ func (o *ObjectInfo) ModTime(ctx context.Context) time.Time {
// Hash returns the selected checksum of the file // Hash returns the selected checksum of the file
// If no checksum is available it returns "" // If no checksum is available it returns ""
func (o *ObjectInfo) Hash(ctx context.Context, ht hash.Type) (string, error) { func (o *ObjectInfo) Hash(ctx context.Context, ht hash.Type) (string, error) {
if ht != hash.MD5 { return "", nil // cannot know the checksum
return "", hash.ErrUnsupported
}
value, err := o.src.Hash(ctx, ht)
if err == hash.ErrUnsupported {
return "", hash.ErrUnsupported
}
return value, err
} }
// ID returns the ID of the Object if known, or "" if not // ID returns the ID of the Object if known, or "" if not