diff --git a/backend/all/all.go b/backend/all/all.go index 0a0b03150..dcda89ca6 100644 --- a/backend/all/all.go +++ b/backend/all/all.go @@ -18,6 +18,7 @@ import ( _ "github.com/rclone/rclone/backend/ftp" _ "github.com/rclone/rclone/backend/googlecloudstorage" _ "github.com/rclone/rclone/backend/googlephotos" + _ "github.com/rclone/rclone/backend/gzip" _ "github.com/rclone/rclone/backend/hdfs" _ "github.com/rclone/rclone/backend/http" _ "github.com/rclone/rclone/backend/hubic" diff --git a/backend/gzip/gzip.go b/backend/gzip/gzip.go new file mode 100644 index 000000000..00246ac5e --- /dev/null +++ b/backend/gzip/gzip.go @@ -0,0 +1,578 @@ +// Package gzip provides wrappers for Fs and Object which implement compression +package gzip + +import ( + "compress/gzip" + "context" + "fmt" + "io" + "io/ioutil" + "strings" + + "github.com/pkg/errors" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/cache" + "github.com/rclone/rclone/fs/config/configmap" + "github.com/rclone/rclone/fs/config/configstruct" + "github.com/rclone/rclone/fs/hash" + "github.com/rclone/rclone/lib/readers" +) + +// Globals +// Register with Fs +func init() { + fs.Register(&fs.RegInfo{ + Name: "gzip", + Description: "Compress/Decompress a remote", + NewFs: NewFs, + Options: []fs.Option{{ + Name: "remote", + Help: "Remote to compress/decompress.\nNormally should contain a ':' and a path, eg \"myremote:path/to/dir\",\n\"myremote:bucket\" or maybe \"myremote:\" (not recommended).", + Required: true, + }}, + }) +} + +// NewFs constructs an Fs from the path, container:path +func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, error) { + // Parse config into Options struct + opt := new(Options) + err := configstruct.Set(m, opt) + if err != nil { + return nil, err + } + // remote := opt.Remote + // if strings.HasPrefix(remote, name+":") { + // return nil, errors.New("can't point gzip remote at itself - check the value of the remote setting") + // } + // wInfo, wName, wPath, wConfig, err := fs.ConfigFs(remote) + // if err != nil { + // return nil, errors.Wrapf(err, "failed to parse remote %q to wrap", remote) + // } + // Make sure to remove trailing . referring to the current dir + // if path.Base(rpath) == "." { + // rpath = strings.TrimSuffix(rpath, ".") + // } + // Look for a file first + // remotePath := fspath.JoinRootPath(wPath, rpath) + // wrappedFs, err := wInfo.NewFs(wName, remotePath, wConfig) + // if that didn't produce a file, look for a directory + // if err != fs.ErrorIsFile { + // remotePath = fspath.JoinRootPath(wPath, rpath) + // wrappedFs, err = wInfo.NewFs(wName, remotePath, wConfig) + // } + // if err != fs.ErrorIsFile && err != nil { + // return nil, errors.Wrapf(err, "failed to make remote %s:%q to wrap", wName, remotePath) + // } + + // Create the remote from the cache + wrappedFs, err := cache.Get(ctx, root) + if err != fs.ErrorIsFile && err != nil { + return nil, errors.Wrapf(err, "failed to make %q to wrap", root) + } + f := &Fs{ + Fs: wrappedFs, + name: name, + root: root, + opt: *opt, + } + // the features here are ones we could support, and they are + // ANDed with the ones from wrappedFs + f.features = (&fs.Features{ + CaseInsensitive: true, + DuplicateFiles: true, + ReadMimeType: false, // MimeTypes not supported with gzip + WriteMimeType: false, + BucketBased: true, + CanHaveEmptyDirectories: true, + SetTier: true, + GetTier: true, + }).Fill(ctx, f).Mask(ctx, wrappedFs).WrapsFs(f, wrappedFs) + + return f, err +} + +// Options defines the configuration for this backend +type Options struct { + Remote string `config:"remote"` +} + +// Fs represents a wrapped fs.Fs +type Fs struct { + fs.Fs + wrapper fs.Fs + name string + root string + opt Options + features *fs.Features // optional features +} + +// Name of the remote (as passed into NewFs) +func (f *Fs) Name() string { + return f.name +} + +// Root of the remote (as passed into NewFs) +func (f *Fs) Root() string { + return f.root +} + +// Features returns the optional features of this Fs +func (f *Fs) Features() *fs.Features { + return f.features +} + +// String returns a description of the FS +func (f *Fs) String() string { + return fmt.Sprintf("Compressed drive '%s:%s'", f.name, f.root) +} + +// decompress some directory entries. This alters entries returning +// it as newEntries. +func (f *Fs) decompressEntries(ctx context.Context, entries fs.DirEntries) (newEntries fs.DirEntries, err error) { + newEntries = entries[:0] // in place filter + for _, entry := range entries { + switch x := entry.(type) { + case fs.Object: + // FIXME decide if decompressing here or not + if strings.HasSuffix(x.Remote(), ".gz") { + newEntries = append(newEntries, f.newObject(x)) + } else { + newEntries = append(newEntries, entry) + } + case fs.Directory: + newEntries = append(newEntries, entry) + default: + return nil, errors.Errorf("Unknown object type %T", entry) + } + } + return newEntries, nil +} + +// List the objects and directories in dir into entries. The +// entries can be returned in any order but should be for a +// complete directory. +// +// dir should be "" to list the root, and should not have +// trailing slashes. +// +// This should return ErrDirNotFound if the directory isn't +// found. +func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { + entries, err = f.Fs.List(ctx, dir) + if err != nil { + return nil, err + } + return f.decompressEntries(ctx, entries) +} + +// ListR lists the objects and directories of the Fs starting +// from dir recursively into out. +// +// dir should be "" to start from the root, and should not +// have trailing slashes. +// +// This should return ErrDirNotFound if the directory isn't +// found. +// +// It should call callback for each tranche of entries read. +// These need not be returned in any particular order. If +// callback returns an error then the listing will stop +// immediately. +// +// Don't implement this unless you have a more efficient way +// of listing recursively that doing a directory traversal. +func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) { + return f.Fs.Features().ListR(ctx, dir, func(entries fs.DirEntries) error { + newEntries, err := f.decompressEntries(ctx, entries) + if err != nil { + return err + } + return callback(newEntries) + }) +} + +// returns the underlying name of in +func (f *Fs) compressFileName(in string) string { + return in + ".gz" +} + +// returns the compressed name of in +func (f *Fs) decompressFileName(in string) string { + if strings.HasSuffix(in, ".gz") { + in = in[:len(in)-3] + } + return in +} + +// A small wrapper to make sure we close the gzipper and the +// underlying handle +type decompress struct { + io.ReadCloser + underlying io.Closer +} + +// Close the decompress object +func (d *decompress) Close() error { + err := d.ReadCloser.Close() + err2 := d.underlying.Close() + if err == nil { + err = err2 + } + return err +} + +// Wrap in in a decompress object +func (f *Fs) decompressData(in io.ReadCloser) (io.ReadCloser, error) { + rc, err := gzip.NewReader(in) + if err != nil { + return nil, err + } + return &decompress{ReadCloser: rc, underlying: in}, nil +} + +type compress struct { + gzipWriter *gzip.Writer + pipeReader *io.PipeReader + pipeWriter *io.PipeWriter + errChan chan error +} + +func (c *compress) Read(p []byte) (n int, err error) { + // fs.Debugf(nil, "Read(%d)", len(p)) + n, err = readers.ReadFill(c.pipeReader, p) + if err != nil && err != io.EOF { + _ = c.pipeWriter.CloseWithError(err) + } + // fs.Debugf(nil, "= %d, %v", n, err) + return n, err +} + +func (c *compress) Close() (err error) { + // fs.Debugf(nil, "Close()") + _ = c.pipeReader.Close() + err = <-c.errChan + // fs.Debugf(nil, "= %v", err) + return err +} + +func (f *Fs) compressData(in io.Reader) (rc io.ReadCloser, err error) { + c := &compress{ + errChan: make(chan error, 1), + } + c.pipeReader, c.pipeWriter = io.Pipe() + c.gzipWriter, err = gzip.NewWriterLevel(c.pipeWriter, 9) // FIXME + if err != nil { + return nil, err + } + go func() { + _, err := io.Copy(c.gzipWriter, in) + err2 := c.gzipWriter.Close() + if err == nil { + err = err2 + } + _ = c.pipeWriter.CloseWithError(err) + c.errChan <- err + }() + return c, nil +} + +// NewObject finds the Object at remote. +func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) { + o, err := f.Fs.NewObject(ctx, f.compressFileName(remote)) + if err != nil { + return nil, err + } + return f.newObject(o), nil +} + +type putFn func(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) + +// put implements Put or PutStream +func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put putFn) (fs.Object, error) { + // Compress the data into compressor + compressor, err := f.compressData(in) + if err != nil { + return nil, err + } + + // Transfer the data + o, err := put(ctx, compressor, f.newObjectInfo(src), options...) + if err != nil { + return nil, err + } + + err = compressor.Close() + if err != nil { + return nil, err + } + + return f.newObject(o), nil +} + +// Put in to the remote path with the modTime given of the given size +// +// May create the object even if it returns an error - if so +// will return the object and the error, otherwise will return +// nil and the error +func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { + return f.put(ctx, in, src, options, f.Fs.Put) +} + +// PutStream uploads to the remote path with the modTime given of indeterminate size +func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { + return f.put(ctx, in, src, options, f.Fs.Features().PutStream) +} + +// Hashes returns the supported hash sets. +func (f *Fs) Hashes() hash.Set { + return hash.Set(hash.None) +} + +// Copy src to this remote using server side copy operations. +// +// This is stored with the remote path given +// +// It returns the destination Object and a possible error +// +// Will only be called if src.Fs().Name() == f.Name() +// +// If it isn't possible then return fs.ErrorCantCopy +func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { + do := f.Fs.Features().Copy + if do == nil { + return nil, fs.ErrorCantCopy + } + o, ok := src.(*Object) + if !ok { + return nil, fs.ErrorCantCopy + } + oResult, err := do(ctx, o.Object, f.compressFileName(remote)) + if err != nil { + return nil, err + } + return f.newObject(oResult), nil +} + +// Move src to this remote using server side move operations. +// +// This is stored with the remote path given +// +// It returns the destination Object and a possible error +// +// Will only be called if src.Fs().Name() == f.Name() +// +// If it isn't possible then return fs.ErrorCantMove +func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { + do := f.Fs.Features().Move + if do == nil { + return nil, fs.ErrorCantMove + } + o, ok := src.(*Object) + if !ok { + return nil, fs.ErrorCantMove + } + oResult, err := do(ctx, o.Object, f.compressFileName(remote)) + if err != nil { + return nil, err + } + return f.newObject(oResult), nil +} + +// PutUnchecked uploads the object +// +// This will create a duplicate if we upload a new file without +// checking to see if there is one already - use Put() for that. +func (f *Fs) PutUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { + do := f.Fs.Features().PutUnchecked + if do == nil { + return nil, errors.New("can't PutUnchecked") + } + wrappedIn, err := f.compressData(in) + if err != nil { + return nil, err + } + o, err := do(ctx, wrappedIn, f.newObjectInfo(src)) + if err != nil { + return nil, err + } + return f.newObject(o), nil +} + +// UnWrap returns the Fs that this Fs is wrapping +func (f *Fs) UnWrap() fs.Fs { + return f.Fs +} + +// WrapFs returns the Fs that is wrapping this Fs +func (f *Fs) WrapFs() fs.Fs { + return f.wrapper +} + +// SetWrapper sets the Fs that is wrapping this Fs +func (f *Fs) SetWrapper(wrapper fs.Fs) { + f.wrapper = wrapper +} + +// Object describes a wrapped for being read from the Fs +// +// This decompresss the remote name and decompresss the data +type Object struct { + fs.Object + f *Fs +} + +func (f *Fs) newObject(o fs.Object) *Object { + return &Object{ + Object: o, + f: f, + } +} + +// Fs returns read only access to the Fs that this object is part of +func (o *Object) Fs() fs.Info { + return o.f +} + +// Return a string version +func (o *Object) String() string { + if o == nil { + return "" + } + return o.Remote() +} + +// Remote returns the remote path +func (o *Object) Remote() string { + remote := o.Object.Remote() + return o.f.decompressFileName(remote) +} + +// Size returns the size of the file +func (o *Object) Size() int64 { + return -1 +} + +// Hash returns the selected checksum of the file +// If no checksum is available it returns "" +func (o *Object) Hash(ctx context.Context, ht hash.Type) (string, error) { + return "", hash.ErrUnsupported +} + +// UnWrap returns the wrapped Object +func (o *Object) UnWrap() fs.Object { + return o.Object +} + +// Open opens the file for read. Call Close() on the returned io.ReadCloser +func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (rc io.ReadCloser, err error) { + var openOptions []fs.OpenOption + var offset, limit int64 = 0, -1 + for _, option := range options { + switch x := option.(type) { + case *fs.SeekOption: + offset = x.Offset + case *fs.RangeOption: + // FIXME o.Size() is wrong! + offset, limit = x.Decode(o.Size()) + default: + // pass on Options to underlying open if appropriate + openOptions = append(openOptions, option) + } + } + rc, err = o.Object.Open(ctx, openOptions...) + if err != nil { + return nil, err + } + rc, err = o.f.decompressData(rc) + if err != nil { + return nil, err + } + // if offset set then discard some data + if offset > 0 { + _, err := io.CopyN(ioutil.Discard, rc, offset) + if err != nil { + return nil, err + } + } + // if limit set then apply limiter + if limit >= 0 { + rc = readers.NewLimitedReadCloser(rc, limit) + } + return rc, nil +} + +// Update in to the object with the modTime given of the given size +func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { + update := func(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { + return o.Object, o.Object.Update(ctx, in, src, options...) + } + _, err := o.f.put(ctx, in, src, options, update) + return err +} + +// ObjectInfo describes a wrapped fs.ObjectInfo for being the source +// +// This compresss the remote name and adjusts the size +type ObjectInfo struct { + fs.ObjectInfo + f *Fs +} + +func (f *Fs) newObjectInfo(src fs.ObjectInfo) *ObjectInfo { + return &ObjectInfo{ + ObjectInfo: src, + f: f, + } +} + +// Fs returns read only access to the Fs that this object is part of +func (o *ObjectInfo) Fs() fs.Info { + return o.f +} + +// Remote returns the remote path +func (o *ObjectInfo) Remote() string { + return o.f.compressFileName(o.ObjectInfo.Remote()) +} + +// Size returns the size of the file +func (o *ObjectInfo) Size() int64 { + return -1 +} + +// Hash returns the selected checksum of the file +// If no checksum is available it returns "" +func (o *ObjectInfo) Hash(ctx context.Context, hash hash.Type) (string, error) { + return "", nil +} + +// Check the interfaces are satisfied +var ( + _ fs.Fs = (*Fs)(nil) + // _ fs.Purger = (*Fs)(nil) + _ fs.Copier = (*Fs)(nil) + _ fs.Mover = (*Fs)(nil) + // _ fs.DirMover = (*Fs)(nil) + // _ fs.Commander = (*Fs)(nil) + _ fs.PutUncheckeder = (*Fs)(nil) + _ fs.PutStreamer = (*Fs)(nil) + // _ fs.CleanUpper = (*Fs)(nil) + _ fs.UnWrapper = (*Fs)(nil) + _ fs.ListRer = (*Fs)(nil) + // _ fs.Abouter = (*Fs)(nil) + _ fs.Wrapper = (*Fs)(nil) + // _ fs.MergeDirser = (*Fs)(nil) + // _ fs.DirCacheFlusher = (*Fs)(nil) + // _ fs.ChangeNotifier = (*Fs)(nil) + // _ fs.PublicLinker = (*Fs)(nil) + // _ fs.UserInfoer = (*Fs)(nil) + // _ fs.Disconnecter = (*Fs)(nil) + _ fs.ObjectInfo = (*ObjectInfo)(nil) + _ fs.Object = (*Object)(nil) + _ fs.ObjectUnWrapper = (*Object)(nil) + +// _ fs.IDer = (*Object)(nil) +// _ fs.SetTierer = (*Object)(nil) +// _ fs.GetTierer = (*Object)(nil) +)