cc634213a5
This also implements backend integration tests for the feature
1547 lines
45 KiB
Go
1547 lines
45 KiB
Go
// Package compress provides wrappers for Fs and Object which implement compression.
|
|
package compress
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"crypto/md5"
|
|
"encoding/base64"
|
|
"encoding/binary"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path"
|
|
"regexp"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/buengese/sgzip"
|
|
"github.com/gabriel-vasile/mimetype"
|
|
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/rclone/rclone/fs/accounting"
|
|
"github.com/rclone/rclone/fs/chunkedreader"
|
|
"github.com/rclone/rclone/fs/config/configmap"
|
|
"github.com/rclone/rclone/fs/config/configstruct"
|
|
"github.com/rclone/rclone/fs/fspath"
|
|
"github.com/rclone/rclone/fs/hash"
|
|
"github.com/rclone/rclone/fs/log"
|
|
"github.com/rclone/rclone/fs/object"
|
|
"github.com/rclone/rclone/fs/operations"
|
|
)
|
|
|
|
// Globals
|
|
const (
|
|
initialChunkSize = 262144 // Initial and max sizes of chunks when reading parts of the file. Currently
|
|
maxChunkSize = 8388608 // at 256 KiB and 8 MiB.
|
|
|
|
bufferSize = 8388608
|
|
heuristicBytes = 1048576
|
|
minCompressionRatio = 1.1
|
|
|
|
gzFileExt = ".gz"
|
|
metaFileExt = ".json"
|
|
uncompressedFileExt = ".bin"
|
|
)
|
|
|
|
// Compression modes
|
|
const (
|
|
Uncompressed = 0
|
|
Gzip = 2
|
|
)
|
|
|
|
var nameRegexp = regexp.MustCompile(`^(.+?)\.([A-Za-z0-9-_]{11})$`)
|
|
|
|
// Register with Fs
|
|
func init() {
|
|
// Build compression mode options.
|
|
compressionModeOptions := []fs.OptionExample{
|
|
{ // Default compression mode options {
|
|
Value: "gzip",
|
|
Help: "Standard gzip compression with fastest parameters.",
|
|
},
|
|
}
|
|
|
|
// Register our remote
|
|
fs.Register(&fs.RegInfo{
|
|
Name: "compress",
|
|
Description: "Compress a remote",
|
|
NewFs: NewFs,
|
|
MetadataInfo: &fs.MetadataInfo{
|
|
Help: `Any metadata supported by the underlying remote is read and written.`,
|
|
},
|
|
Options: []fs.Option{{
|
|
Name: "remote",
|
|
Help: "Remote to compress.",
|
|
Required: true,
|
|
}, {
|
|
Name: "mode",
|
|
Help: "Compression mode.",
|
|
Default: "gzip",
|
|
Examples: compressionModeOptions,
|
|
}, {
|
|
Name: "level",
|
|
Help: `GZIP compression level (-2 to 9).
|
|
|
|
Generally -1 (default, equivalent to 5) is recommended.
|
|
Levels 1 to 9 increase compression at the cost of speed. Going past 6
|
|
generally offers very little return.
|
|
|
|
Level -2 uses Huffman encoding only. Only use if you know what you
|
|
are doing.
|
|
Level 0 turns off compression.`,
|
|
Default: sgzip.DefaultCompression,
|
|
Advanced: true,
|
|
}, {
|
|
Name: "ram_cache_limit",
|
|
Help: `Some remotes don't allow the upload of files with unknown size.
|
|
In this case the compressed file will need to be cached to determine
|
|
it's size.
|
|
|
|
Files smaller than this limit will be cached in RAM, files larger than
|
|
this limit will be cached on disk.`,
|
|
Default: fs.SizeSuffix(20 * 1024 * 1024),
|
|
Advanced: true,
|
|
}},
|
|
})
|
|
}
|
|
|
|
// Options defines the configuration for this backend
|
|
type Options struct {
|
|
Remote string `config:"remote"`
|
|
CompressionMode string `config:"mode"`
|
|
CompressionLevel int `config:"level"`
|
|
RAMCacheLimit fs.SizeSuffix `config:"ram_cache_limit"`
|
|
}
|
|
|
|
/*** FILESYSTEM FUNCTIONS ***/
|
|
|
|
// Fs represents a wrapped fs.Fs
|
|
type Fs struct {
|
|
fs.Fs
|
|
wrapper fs.Fs
|
|
name string
|
|
root string
|
|
opt Options
|
|
mode int // compression mode id
|
|
features *fs.Features // optional features
|
|
}
|
|
|
|
// NewFs constructs an Fs from the path, container:path
|
|
func NewFs(ctx context.Context, name, rpath string, m configmap.Mapper) (fs.Fs, error) {
|
|
// Parse config into Options struct
|
|
opt := new(Options)
|
|
err := configstruct.Set(m, opt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
remote := opt.Remote
|
|
if strings.HasPrefix(remote, name+":") {
|
|
return nil, errors.New("can't point press remote at itself - check the value of the remote setting")
|
|
}
|
|
|
|
wInfo, wName, wPath, wConfig, err := fs.ConfigFs(remote)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to parse remote %q to wrap: %w", remote, err)
|
|
}
|
|
|
|
// Strip trailing slashes if they exist in rpath
|
|
rpath = strings.TrimRight(rpath, "\\/")
|
|
|
|
// First, check for a file
|
|
// If a metadata file was found, return an error. Otherwise, check for a directory
|
|
remotePath := fspath.JoinRootPath(wPath, makeMetadataName(rpath))
|
|
wrappedFs, err := wInfo.NewFs(ctx, wName, remotePath, wConfig)
|
|
if err != fs.ErrorIsFile {
|
|
remotePath = fspath.JoinRootPath(wPath, rpath)
|
|
wrappedFs, err = wInfo.NewFs(ctx, wName, remotePath, wConfig)
|
|
}
|
|
if err != nil && err != fs.ErrorIsFile {
|
|
return nil, fmt.Errorf("failed to make remote %s:%q to wrap: %w", wName, remotePath, err)
|
|
}
|
|
|
|
// Create the wrapping fs
|
|
f := &Fs{
|
|
Fs: wrappedFs,
|
|
name: name,
|
|
root: rpath,
|
|
opt: *opt,
|
|
mode: compressionModeFromName(opt.CompressionMode),
|
|
}
|
|
// Correct root if definitely pointing to a file
|
|
if err == fs.ErrorIsFile {
|
|
f.root = path.Dir(f.root)
|
|
if f.root == "." || f.root == "/" {
|
|
f.root = ""
|
|
}
|
|
}
|
|
// the features here are ones we could support, and they are
|
|
// ANDed with the ones from wrappedFs
|
|
f.features = (&fs.Features{
|
|
CaseInsensitive: true,
|
|
DuplicateFiles: false,
|
|
ReadMimeType: false,
|
|
WriteMimeType: false,
|
|
GetTier: true,
|
|
SetTier: true,
|
|
BucketBased: true,
|
|
CanHaveEmptyDirectories: true,
|
|
ReadMetadata: true,
|
|
WriteMetadata: true,
|
|
UserMetadata: true,
|
|
ReadDirMetadata: true,
|
|
WriteDirMetadata: true,
|
|
WriteDirSetModTime: true,
|
|
UserDirMetadata: true,
|
|
DirModTimeUpdatesOnWrite: true,
|
|
PartialUploads: true,
|
|
}).Fill(ctx, f).Mask(ctx, wrappedFs).WrapsFs(f, wrappedFs)
|
|
// We support reading MIME types no matter the wrapped fs
|
|
f.features.ReadMimeType = true
|
|
// We can only support putstream if we have serverside copy or move
|
|
if !operations.CanServerSideMove(wrappedFs) {
|
|
f.features.Disable("PutStream")
|
|
}
|
|
|
|
return f, err
|
|
}
|
|
|
|
func compressionModeFromName(name string) int {
|
|
switch name {
|
|
case "gzip":
|
|
return Gzip
|
|
default:
|
|
return Uncompressed
|
|
}
|
|
}
|
|
|
|
// Converts an int64 to base64
|
|
func int64ToBase64(number int64) string {
|
|
intBytes := make([]byte, 8)
|
|
binary.LittleEndian.PutUint64(intBytes, uint64(number))
|
|
return base64.RawURLEncoding.EncodeToString(intBytes)
|
|
}
|
|
|
|
// Converts base64 to int64
|
|
func base64ToInt64(str string) (int64, error) {
|
|
intBytes, err := base64.RawURLEncoding.DecodeString(str)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return int64(binary.LittleEndian.Uint64(intBytes)), nil
|
|
}
|
|
|
|
// Processes a file name for a compressed file. Returns the original file name, the extension, and the size of the original file.
|
|
// Returns -2 for the original size if the file is uncompressed.
|
|
func processFileName(compressedFileName string) (origFileName string, extension string, origSize int64, err error) {
|
|
// Separate the filename and size from the extension
|
|
extensionPos := strings.LastIndex(compressedFileName, ".")
|
|
if extensionPos == -1 {
|
|
return "", "", 0, errors.New("file name has no extension")
|
|
}
|
|
extension = compressedFileName[extensionPos:]
|
|
nameWithSize := compressedFileName[:extensionPos]
|
|
if extension == uncompressedFileExt {
|
|
return nameWithSize, extension, -2, nil
|
|
}
|
|
match := nameRegexp.FindStringSubmatch(nameWithSize)
|
|
if match == nil || len(match) != 3 {
|
|
return "", "", 0, errors.New("invalid filename")
|
|
}
|
|
size, err := base64ToInt64(match[2])
|
|
if err != nil {
|
|
return "", "", 0, errors.New("could not decode size")
|
|
}
|
|
return match[1], gzFileExt, size, nil
|
|
}
|
|
|
|
// Generates the file name for a metadata file
|
|
func makeMetadataName(remote string) (newRemote string) {
|
|
return remote + metaFileExt
|
|
}
|
|
|
|
// Checks whether a file is a metadata file
|
|
func isMetadataFile(filename string) bool {
|
|
return strings.HasSuffix(filename, metaFileExt)
|
|
}
|
|
|
|
// Checks whether a file is a metadata file and returns the original
|
|
// file name and a flag indicating whether it was a metadata file or
|
|
// not.
|
|
func unwrapMetadataFile(filename string) (string, bool) {
|
|
if !isMetadataFile(filename) {
|
|
return "", false
|
|
}
|
|
return filename[:len(filename)-len(metaFileExt)], true
|
|
}
|
|
|
|
// makeDataName generates the file name for a data file with specified compression mode
|
|
func makeDataName(remote string, size int64, mode int) (newRemote string) {
|
|
if mode != Uncompressed {
|
|
newRemote = remote + "." + int64ToBase64(size) + gzFileExt
|
|
} else {
|
|
newRemote = remote + uncompressedFileExt
|
|
}
|
|
return newRemote
|
|
}
|
|
|
|
// dataName generates the file name for data file
|
|
func (f *Fs) dataName(remote string, size int64, compressed bool) (name string) {
|
|
if !compressed {
|
|
return makeDataName(remote, size, Uncompressed)
|
|
}
|
|
return makeDataName(remote, size, f.mode)
|
|
}
|
|
|
|
// addData parses an object and adds it to the DirEntries
|
|
func (f *Fs) addData(entries *fs.DirEntries, o fs.Object) {
|
|
origFileName, _, size, err := processFileName(o.Remote())
|
|
if err != nil {
|
|
fs.Errorf(o, "Error on parsing file name: %v", err)
|
|
return
|
|
}
|
|
if size == -2 { // File is uncompressed
|
|
size = o.Size()
|
|
}
|
|
metaName := makeMetadataName(origFileName)
|
|
*entries = append(*entries, f.newObjectSizeAndNameOnly(o, metaName, size))
|
|
}
|
|
|
|
// addDir adds a dir to the dir entries
|
|
func (f *Fs) addDir(entries *fs.DirEntries, dir fs.Directory) {
|
|
*entries = append(*entries, f.newDir(dir))
|
|
}
|
|
|
|
// newDir returns a dir
|
|
func (f *Fs) newDir(dir fs.Directory) fs.Directory {
|
|
return dir // We're using the same dir
|
|
}
|
|
|
|
// processEntries parses the file names and adds metadata to the dir entries
|
|
func (f *Fs) processEntries(entries fs.DirEntries) (newEntries fs.DirEntries, err error) {
|
|
newEntries = entries[:0] // in place filter
|
|
for _, entry := range entries {
|
|
switch x := entry.(type) {
|
|
case fs.Object:
|
|
if !isMetadataFile(x.Remote()) {
|
|
f.addData(&newEntries, x) // Only care about data files for now; metadata files are redundant.
|
|
}
|
|
case fs.Directory:
|
|
f.addDir(&newEntries, x)
|
|
default:
|
|
return nil, fmt.Errorf("unknown object type %T", entry)
|
|
}
|
|
}
|
|
return newEntries, nil
|
|
}
|
|
|
|
// List the objects and directories in dir into entries. The
|
|
// entries can be returned in any order but should be for a
|
|
// complete directory.
|
|
//
|
|
// dir should be "" to list the root, and should not have
|
|
// trailing slashes.
|
|
//
|
|
// This should return ErrDirNotFound if the directory isn't
|
|
// found.
|
|
// List entries and process them
|
|
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
|
|
entries, err = f.Fs.List(ctx, dir)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return f.processEntries(entries)
|
|
}
|
|
|
|
// ListR lists the objects and directories of the Fs starting
|
|
// from dir recursively into out.
|
|
//
|
|
// dir should be "" to start from the root, and should not
|
|
// have trailing slashes.
|
|
//
|
|
// This should return ErrDirNotFound if the directory isn't
|
|
// found.
|
|
//
|
|
// It should call callback for each tranche of entries read.
|
|
// These need not be returned in any particular order. If
|
|
// callback returns an error then the listing will stop
|
|
// immediately.
|
|
//
|
|
// Don't implement this unless you have a more efficient way
|
|
// of listing recursively that doing a directory traversal.
|
|
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
|
|
return f.Fs.Features().ListR(ctx, dir, func(entries fs.DirEntries) error {
|
|
newEntries, err := f.processEntries(entries)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return callback(newEntries)
|
|
})
|
|
}
|
|
|
|
// NewObject finds the Object at remote.
|
|
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
|
|
// Read metadata from metadata object
|
|
mo, err := f.Fs.NewObject(ctx, makeMetadataName(remote))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
meta, err := readMetadata(ctx, mo)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error decoding metadata: %w", err)
|
|
}
|
|
// Create our Object
|
|
o, err := f.Fs.NewObject(ctx, makeDataName(remote, meta.CompressionMetadata.Size, meta.Mode))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return f.newObject(o, mo, meta), nil
|
|
}
|
|
|
|
// checkCompressAndType checks if an object is compressible and determines it's mime type
|
|
// returns a multireader with the bytes that were read to determine mime type
|
|
func checkCompressAndType(in io.Reader) (newReader io.Reader, compressible bool, mimeType string, err error) {
|
|
in, wrap := accounting.UnWrap(in)
|
|
buf := make([]byte, heuristicBytes)
|
|
n, err := in.Read(buf)
|
|
buf = buf[:n]
|
|
if err != nil && err != io.EOF {
|
|
return nil, false, "", err
|
|
}
|
|
mime := mimetype.Detect(buf)
|
|
compressible, err = isCompressible(bytes.NewReader(buf))
|
|
if err != nil {
|
|
return nil, false, "", err
|
|
}
|
|
in = io.MultiReader(bytes.NewReader(buf), in)
|
|
return wrap(in), compressible, mime.String(), nil
|
|
}
|
|
|
|
// isCompressible checks the compression ratio of the provided data and returns true if the ratio exceeds
|
|
// the configured threshold
|
|
func isCompressible(r io.Reader) (bool, error) {
|
|
var b bytes.Buffer
|
|
w, err := sgzip.NewWriterLevel(&b, sgzip.DefaultCompression)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
n, err := io.Copy(w, r)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
err = w.Close()
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
ratio := float64(n) / float64(b.Len())
|
|
return ratio > minCompressionRatio, nil
|
|
}
|
|
|
|
// verifyObjectHash verifies the Objects hash
|
|
func (f *Fs) verifyObjectHash(ctx context.Context, o fs.Object, hasher *hash.MultiHasher, ht hash.Type) error {
|
|
srcHash := hasher.Sums()[ht]
|
|
dstHash, err := o.Hash(ctx, ht)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to read destination hash: %w", err)
|
|
}
|
|
if srcHash != "" && dstHash != "" && srcHash != dstHash {
|
|
// remove object
|
|
err = o.Remove(ctx)
|
|
if err != nil {
|
|
fs.Errorf(o, "Failed to remove corrupted object: %v", err)
|
|
}
|
|
return fmt.Errorf("corrupted on transfer: %v compressed hashes differ src(%s) %q vs dst(%s) %q", ht, f.Fs, srcHash, o.Fs(), dstHash)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type putFn func(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error)
|
|
|
|
type compressionResult struct {
|
|
err error
|
|
meta sgzip.GzipMetadata
|
|
}
|
|
|
|
// replicating some of operations.Rcat functionality because we want to support remotes without streaming
|
|
// support and of course cannot know the size of a compressed file before compressing it.
|
|
func (f *Fs) rcat(ctx context.Context, dstFileName string, in io.ReadCloser, modTime time.Time, options []fs.OpenOption) (o fs.Object, err error) {
|
|
|
|
// cache small files in memory and do normal upload
|
|
buf := make([]byte, f.opt.RAMCacheLimit)
|
|
if n, err := io.ReadFull(in, buf); err == io.EOF || err == io.ErrUnexpectedEOF {
|
|
src := object.NewStaticObjectInfo(dstFileName, modTime, int64(len(buf[:n])), false, nil, f.Fs)
|
|
return f.Fs.Put(ctx, bytes.NewBuffer(buf[:n]), src, options...)
|
|
}
|
|
|
|
// Need to include what we already read
|
|
in = &ReadCloserWrapper{
|
|
Reader: io.MultiReader(bytes.NewReader(buf), in),
|
|
Closer: in,
|
|
}
|
|
|
|
canStream := f.Fs.Features().PutStream != nil
|
|
if canStream {
|
|
src := object.NewStaticObjectInfo(dstFileName, modTime, -1, false, nil, f.Fs)
|
|
return f.Fs.Features().PutStream(ctx, in, src, options...)
|
|
}
|
|
|
|
fs.Debugf(f, "Target remote doesn't support streaming uploads, creating temporary local file")
|
|
tempFile, err := os.CreateTemp("", "rclone-press-")
|
|
defer func() {
|
|
// these errors should be relatively uncritical and the upload should've succeeded so it's okay-ish
|
|
// to ignore them
|
|
_ = tempFile.Close()
|
|
_ = os.Remove(tempFile.Name())
|
|
}()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create temporary local FS to spool file: %w", err)
|
|
}
|
|
if _, err = io.Copy(tempFile, in); err != nil {
|
|
return nil, fmt.Errorf("failed to write temporary local file: %w", err)
|
|
}
|
|
if _, err = tempFile.Seek(0, 0); err != nil {
|
|
return nil, err
|
|
}
|
|
finfo, err := tempFile.Stat()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return f.Fs.Put(ctx, tempFile, object.NewStaticObjectInfo(dstFileName, modTime, finfo.Size(), false, nil, f.Fs))
|
|
}
|
|
|
|
// Put a compressed version of a file. Returns a wrappable object and metadata.
|
|
func (f *Fs) putCompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, mimeType string) (fs.Object, *ObjectMetadata, error) {
|
|
// Unwrap reader accounting
|
|
in, wrap := accounting.UnWrap(in)
|
|
|
|
// Add the metadata hasher
|
|
metaHasher := md5.New()
|
|
in = io.TeeReader(in, metaHasher)
|
|
|
|
// Compress the file
|
|
pipeReader, pipeWriter := io.Pipe()
|
|
results := make(chan compressionResult)
|
|
go func() {
|
|
gz, err := sgzip.NewWriterLevel(pipeWriter, f.opt.CompressionLevel)
|
|
if err != nil {
|
|
results <- compressionResult{err: err, meta: sgzip.GzipMetadata{}}
|
|
return
|
|
}
|
|
_, err = io.Copy(gz, in)
|
|
gzErr := gz.Close()
|
|
if gzErr != nil {
|
|
fs.Errorf(nil, "Failed to close compress: %v", gzErr)
|
|
if err == nil {
|
|
err = gzErr
|
|
}
|
|
}
|
|
closeErr := pipeWriter.Close()
|
|
if closeErr != nil {
|
|
fs.Errorf(nil, "Failed to close pipe: %v", closeErr)
|
|
if err == nil {
|
|
err = closeErr
|
|
}
|
|
}
|
|
results <- compressionResult{err: err, meta: gz.MetaData()}
|
|
}()
|
|
wrappedIn := wrap(bufio.NewReaderSize(pipeReader, bufferSize)) // Probably no longer needed as sgzip has it's own buffering
|
|
|
|
// Find a hash the destination supports to compute a hash of
|
|
// the compressed data.
|
|
ht := f.Fs.Hashes().GetOne()
|
|
var hasher *hash.MultiHasher
|
|
var err error
|
|
if ht != hash.None {
|
|
// unwrap the accounting again
|
|
wrappedIn, wrap = accounting.UnWrap(wrappedIn)
|
|
hasher, err = hash.NewMultiHasherTypes(hash.NewHashSet(ht))
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
// add the hasher and re-wrap the accounting
|
|
wrappedIn = io.TeeReader(wrappedIn, hasher)
|
|
wrappedIn = wrap(wrappedIn)
|
|
}
|
|
|
|
// Transfer the data
|
|
o, err := f.rcat(ctx, makeDataName(src.Remote(), src.Size(), f.mode), io.NopCloser(wrappedIn), src.ModTime(ctx), options)
|
|
//o, err := operations.Rcat(ctx, f.Fs, makeDataName(src.Remote(), src.Size(), f.mode), io.NopCloser(wrappedIn), src.ModTime(ctx))
|
|
if err != nil {
|
|
if o != nil {
|
|
removeErr := o.Remove(ctx)
|
|
if removeErr != nil {
|
|
fs.Errorf(o, "Failed to remove partially transferred object: %v", err)
|
|
}
|
|
}
|
|
return nil, nil, err
|
|
}
|
|
// Check whether we got an error during compression
|
|
result := <-results
|
|
err = result.err
|
|
if err != nil {
|
|
if o != nil {
|
|
removeErr := o.Remove(ctx)
|
|
if removeErr != nil {
|
|
fs.Errorf(o, "Failed to remove partially compressed object: %v", err)
|
|
}
|
|
}
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Generate metadata
|
|
meta := newMetadata(result.meta.Size, f.mode, result.meta, hex.EncodeToString(metaHasher.Sum(nil)), mimeType)
|
|
|
|
// Check the hashes of the compressed data if we were comparing them
|
|
if ht != hash.None && hasher != nil {
|
|
err = f.verifyObjectHash(ctx, o, hasher, ht)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
}
|
|
|
|
return o, meta, nil
|
|
}
|
|
|
|
// Put an uncompressed version of a file. Returns a wrappable object and metadata.
|
|
func (f *Fs) putUncompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, put putFn, options []fs.OpenOption, mimeType string) (fs.Object, *ObjectMetadata, error) {
|
|
// Unwrap the accounting, add our metadata hasher, then wrap it back on
|
|
in, wrap := accounting.UnWrap(in)
|
|
|
|
hs := hash.NewHashSet(hash.MD5)
|
|
ht := f.Fs.Hashes().GetOne()
|
|
if !hs.Contains(ht) {
|
|
hs.Add(ht)
|
|
}
|
|
metaHasher, err := hash.NewMultiHasherTypes(hs)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
in = io.TeeReader(in, metaHasher)
|
|
wrappedIn := wrap(in)
|
|
|
|
// Put the object
|
|
o, err := put(ctx, wrappedIn, f.wrapInfo(src, makeDataName(src.Remote(), src.Size(), Uncompressed), src.Size()), options...)
|
|
if err != nil {
|
|
if o != nil {
|
|
removeErr := o.Remove(ctx)
|
|
if removeErr != nil {
|
|
fs.Errorf(o, "Failed to remove partially transferred object: %v", err)
|
|
}
|
|
}
|
|
return nil, nil, err
|
|
}
|
|
// Check the hashes of the compressed data if we were comparing them
|
|
if ht != hash.None {
|
|
err := f.verifyObjectHash(ctx, o, metaHasher, ht)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
}
|
|
|
|
// Return our object and metadata
|
|
sum, err := metaHasher.Sum(hash.MD5)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
return o, newMetadata(o.Size(), Uncompressed, sgzip.GzipMetadata{}, hex.EncodeToString(sum), mimeType), nil
|
|
}
|
|
|
|
// This function will write a metadata struct to a metadata Object for an src. Returns a wrappable metadata object.
|
|
func (f *Fs) putMetadata(ctx context.Context, meta *ObjectMetadata, src fs.ObjectInfo, options []fs.OpenOption, put putFn) (mo fs.Object, err error) {
|
|
// Generate the metadata contents
|
|
data, err := json.Marshal(meta)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
metaReader := bytes.NewReader(data)
|
|
|
|
// Put the data
|
|
mo, err = put(ctx, metaReader, f.wrapInfo(src, makeMetadataName(src.Remote()), int64(len(data))), options...)
|
|
if err != nil {
|
|
if mo != nil {
|
|
removeErr := mo.Remove(ctx)
|
|
if removeErr != nil {
|
|
fs.Errorf(mo, "Failed to remove partially transferred object: %v", err)
|
|
}
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
return mo, nil
|
|
}
|
|
|
|
// This function will put both the data and metadata for an Object.
|
|
// putData is the function used for data, while putMeta is the function used for metadata.
|
|
// The putData function will only be used when the object is not compressible if the
|
|
// data is compressible this parameter will be ignored.
|
|
func (f *Fs) putWithCustomFunctions(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption,
|
|
putData putFn, putMeta putFn, compressible bool, mimeType string) (*Object, error) {
|
|
// Put file then metadata
|
|
var dataObject fs.Object
|
|
var meta *ObjectMetadata
|
|
var err error
|
|
if compressible {
|
|
dataObject, meta, err = f.putCompress(ctx, in, src, options, mimeType)
|
|
} else {
|
|
dataObject, meta, err = f.putUncompress(ctx, in, src, putData, options, mimeType)
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
mo, err := f.putMetadata(ctx, meta, src, options, putMeta)
|
|
|
|
// meta data upload may fail. in this case we try to remove the original object
|
|
if err != nil {
|
|
removeError := dataObject.Remove(ctx)
|
|
if removeError != nil {
|
|
return nil, removeError
|
|
}
|
|
return nil, err
|
|
}
|
|
return f.newObject(dataObject, mo, meta), nil
|
|
}
|
|
|
|
// Put in to the remote path with the modTime given of the given size
|
|
//
|
|
// May create the object even if it returns an error - if so
|
|
// will return the object and the error, otherwise will return
|
|
// nil and the error
|
|
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
|
// If there's already an existent objects we need to make sure to explicitly update it to make sure we don't leave
|
|
// orphaned data. Alternatively we could also deleted (which would simpler) but has the disadvantage that it
|
|
// destroys all server-side versioning.
|
|
o, err := f.NewObject(ctx, src.Remote())
|
|
if err == fs.ErrorObjectNotFound {
|
|
// Get our file compressibility
|
|
in, compressible, mimeType, err := checkCompressAndType(in)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return f.putWithCustomFunctions(ctx, in, src, options, f.Fs.Put, f.Fs.Put, compressible, mimeType)
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return o, o.Update(ctx, in, src, options...)
|
|
}
|
|
|
|
// PutStream uploads to the remote path with the modTime given of indeterminate size
|
|
func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
|
oldObj, err := f.NewObject(ctx, src.Remote())
|
|
if err != nil && err != fs.ErrorObjectNotFound {
|
|
return nil, err
|
|
}
|
|
found := err == nil
|
|
|
|
in, compressible, mimeType, err := checkCompressAndType(in)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
newObj, err := f.putWithCustomFunctions(ctx, in, src, options, f.Fs.Features().PutStream, f.Fs.Put, compressible, mimeType)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Our transfer is now complete. We have to make sure to remove the old object because our new object will
|
|
// have a different name except when both the old and the new object where uncompressed.
|
|
if found && (oldObj.(*Object).meta.Mode != Uncompressed || compressible) {
|
|
err = oldObj.(*Object).Object.Remove(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("couldn't remove original object: %w", err)
|
|
}
|
|
}
|
|
|
|
// If our new object is compressed we have to rename it with the correct size.
|
|
// Uncompressed objects don't store the size in the name so we they'll already have the correct name.
|
|
if compressible {
|
|
wrapObj, err := operations.Move(ctx, f.Fs, nil, f.dataName(src.Remote(), newObj.size, compressible), newObj.Object)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("couldn't rename streamed object: %w", err)
|
|
}
|
|
newObj.Object = wrapObj
|
|
}
|
|
return newObj, nil
|
|
}
|
|
|
|
// Temporarily disabled. There might be a way to implement this correctly but with the current handling metadata duplicate objects
|
|
// will break stuff. Right no I can't think of a way to make this work.
|
|
|
|
// PutUnchecked uploads the object
|
|
//
|
|
// This will create a duplicate if we upload a new file without
|
|
// checking to see if there is one already - use Put() for that.
|
|
|
|
// Hashes returns the supported hash sets.
|
|
func (f *Fs) Hashes() hash.Set {
|
|
return hash.Set(hash.MD5)
|
|
}
|
|
|
|
// Mkdir makes the directory (container, bucket)
|
|
//
|
|
// Shouldn't return an error if it already exists
|
|
func (f *Fs) Mkdir(ctx context.Context, dir string) error {
|
|
return f.Fs.Mkdir(ctx, dir)
|
|
}
|
|
|
|
// MkdirMetadata makes the root directory of the Fs object
|
|
func (f *Fs) MkdirMetadata(ctx context.Context, dir string, metadata fs.Metadata) (fs.Directory, error) {
|
|
if do := f.Fs.Features().MkdirMetadata; do != nil {
|
|
return do(ctx, dir, metadata)
|
|
}
|
|
return nil, fs.ErrorNotImplemented
|
|
}
|
|
|
|
// Rmdir removes the directory (container, bucket) if empty
|
|
//
|
|
// Return an error if it doesn't exist or isn't empty
|
|
func (f *Fs) Rmdir(ctx context.Context, dir string) error {
|
|
return f.Fs.Rmdir(ctx, dir)
|
|
}
|
|
|
|
// Purge all files in the root and the root directory
|
|
//
|
|
// Implement this if you have a way of deleting all the files
|
|
// quicker than just running Remove() on the result of List()
|
|
//
|
|
// Return an error if it doesn't exist
|
|
func (f *Fs) Purge(ctx context.Context, dir string) error {
|
|
do := f.Fs.Features().Purge
|
|
if do == nil {
|
|
return fs.ErrorCantPurge
|
|
}
|
|
return do(ctx, dir)
|
|
}
|
|
|
|
// Copy src to this remote using server side copy operations.
|
|
//
|
|
// This is stored with the remote path given.
|
|
//
|
|
// It returns the destination Object and a possible error.
|
|
//
|
|
// Will only be called if src.Fs().Name() == f.Name()
|
|
//
|
|
// If it isn't possible then return fs.ErrorCantCopy
|
|
func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
|
|
do := f.Fs.Features().Copy
|
|
if do == nil {
|
|
return nil, fs.ErrorCantCopy
|
|
}
|
|
o, ok := src.(*Object)
|
|
if !ok {
|
|
return nil, fs.ErrorCantCopy
|
|
}
|
|
// We might be trying to overwrite a file with a newer version but due to size difference the name
|
|
// is different. Therefore we have to remove the old file first (if it exists).
|
|
dstFile, err := f.NewObject(ctx, remote)
|
|
if err != nil && err != fs.ErrorObjectNotFound {
|
|
return nil, err
|
|
}
|
|
if err == nil {
|
|
err := dstFile.Remove(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// Copy over metadata
|
|
err = o.loadMetadataIfNotLoaded(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
newFilename := makeMetadataName(remote)
|
|
moResult, err := do(ctx, o.mo, newFilename)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// Copy over data
|
|
newFilename = makeDataName(remote, src.Size(), o.meta.Mode)
|
|
oResult, err := do(ctx, o.Object, newFilename)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return f.newObject(oResult, moResult, o.meta), nil
|
|
}
|
|
|
|
// Move src to this remote using server side move operations.
|
|
//
|
|
// This is stored with the remote path given.
|
|
//
|
|
// It returns the destination Object and a possible error.
|
|
//
|
|
// Will only be called if src.Fs().Name() == f.Name()
|
|
//
|
|
// If it isn't possible then return fs.ErrorCantMove
|
|
func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
|
|
do := f.Fs.Features().Move
|
|
if do == nil {
|
|
return nil, fs.ErrorCantMove
|
|
}
|
|
o, ok := src.(*Object)
|
|
if !ok {
|
|
return nil, fs.ErrorCantMove
|
|
}
|
|
// We might be trying to overwrite a file with a newer version but due to size difference the name
|
|
// is different. Therefore we have to remove the old file first (if it exists).
|
|
dstFile, err := f.NewObject(ctx, remote)
|
|
if err != nil && err != fs.ErrorObjectNotFound {
|
|
return nil, err
|
|
}
|
|
if err == nil {
|
|
err := dstFile.Remove(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// Move metadata
|
|
err = o.loadMetadataIfNotLoaded(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
newFilename := makeMetadataName(remote)
|
|
moResult, err := do(ctx, o.mo, newFilename)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Move data
|
|
newFilename = makeDataName(remote, src.Size(), o.meta.Mode)
|
|
oResult, err := do(ctx, o.Object, newFilename)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return f.newObject(oResult, moResult, o.meta), nil
|
|
}
|
|
|
|
// DirMove moves src, srcRemote to this remote at dstRemote
|
|
// using server side move operations.
|
|
//
|
|
// Will only be called if src.Fs().Name() == f.Name()
|
|
//
|
|
// If it isn't possible then return fs.ErrorCantDirMove
|
|
//
|
|
// If destination exists then return fs.ErrorDirExists
|
|
func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string) error {
|
|
do := f.Fs.Features().DirMove
|
|
if do == nil {
|
|
return fs.ErrorCantDirMove
|
|
}
|
|
srcFs, ok := src.(*Fs)
|
|
if !ok {
|
|
fs.Debugf(srcFs, "Can't move directory - not same remote type")
|
|
return fs.ErrorCantDirMove
|
|
}
|
|
return do(ctx, srcFs.Fs, srcRemote, dstRemote)
|
|
}
|
|
|
|
// DirSetModTime sets the directory modtime for dir
|
|
func (f *Fs) DirSetModTime(ctx context.Context, dir string, modTime time.Time) error {
|
|
if do := f.Fs.Features().DirSetModTime; do != nil {
|
|
return do(ctx, dir, modTime)
|
|
}
|
|
return fs.ErrorNotImplemented
|
|
}
|
|
|
|
// CleanUp the trash in the Fs
|
|
//
|
|
// Implement this if you have a way of emptying the trash or
|
|
// otherwise cleaning up old versions of files.
|
|
func (f *Fs) CleanUp(ctx context.Context) error {
|
|
do := f.Fs.Features().CleanUp
|
|
if do == nil {
|
|
return errors.New("not supported by underlying remote")
|
|
}
|
|
return do(ctx)
|
|
}
|
|
|
|
// About gets quota information from the Fs
|
|
func (f *Fs) About(ctx context.Context) (*fs.Usage, error) {
|
|
do := f.Fs.Features().About
|
|
if do == nil {
|
|
return nil, errors.New("not supported by underlying remote")
|
|
}
|
|
return do(ctx)
|
|
}
|
|
|
|
// UnWrap returns the Fs that this Fs is wrapping
|
|
func (f *Fs) UnWrap() fs.Fs {
|
|
return f.Fs
|
|
}
|
|
|
|
// WrapFs returns the Fs that is wrapping this Fs
|
|
func (f *Fs) WrapFs() fs.Fs {
|
|
return f.wrapper
|
|
}
|
|
|
|
// SetWrapper sets the Fs that is wrapping this Fs
|
|
func (f *Fs) SetWrapper(wrapper fs.Fs) {
|
|
f.wrapper = wrapper
|
|
}
|
|
|
|
// MergeDirs merges the contents of all the directories passed
|
|
// in into the first one and rmdirs the other directories.
|
|
func (f *Fs) MergeDirs(ctx context.Context, dirs []fs.Directory) error {
|
|
do := f.Fs.Features().MergeDirs
|
|
if do == nil {
|
|
return errors.New("MergeDirs not supported")
|
|
}
|
|
out := make([]fs.Directory, len(dirs))
|
|
for i, dir := range dirs {
|
|
out[i] = fs.NewDirCopy(ctx, dir).SetRemote(dir.Remote())
|
|
}
|
|
return do(ctx, out)
|
|
}
|
|
|
|
// DirCacheFlush resets the directory cache - used in testing
|
|
// as an optional interface
|
|
func (f *Fs) DirCacheFlush() {
|
|
do := f.Fs.Features().DirCacheFlush
|
|
if do != nil {
|
|
do()
|
|
}
|
|
}
|
|
|
|
// ChangeNotify calls the passed function with a path
|
|
// that has had changes. If the implementation
|
|
// uses polling, it should adhere to the given interval.
|
|
func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryType), pollIntervalChan <-chan time.Duration) {
|
|
do := f.Fs.Features().ChangeNotify
|
|
if do == nil {
|
|
return
|
|
}
|
|
wrappedNotifyFunc := func(path string, entryType fs.EntryType) {
|
|
fs.Logf(f, "path %q entryType %d", path, entryType)
|
|
var (
|
|
wrappedPath string
|
|
isMetadataFile bool
|
|
)
|
|
switch entryType {
|
|
case fs.EntryDirectory:
|
|
wrappedPath = path
|
|
case fs.EntryObject:
|
|
// Note: All we really need to do to monitor the object is to check whether the metadata changed,
|
|
// as the metadata contains the hash. This will work unless there's a hash collision and the sizes stay the same.
|
|
wrappedPath, isMetadataFile = unwrapMetadataFile(path)
|
|
if !isMetadataFile {
|
|
return
|
|
}
|
|
default:
|
|
fs.Errorf(path, "press ChangeNotify: ignoring unknown EntryType %d", entryType)
|
|
return
|
|
}
|
|
notifyFunc(wrappedPath, entryType)
|
|
}
|
|
do(ctx, wrappedNotifyFunc, pollIntervalChan)
|
|
}
|
|
|
|
// PublicLink generates a public link to the remote path (usually readable by anyone)
|
|
func (f *Fs) PublicLink(ctx context.Context, remote string, duration fs.Duration, unlink bool) (string, error) {
|
|
do := f.Fs.Features().PublicLink
|
|
if do == nil {
|
|
return "", errors.New("can't PublicLink: not supported by underlying remote")
|
|
}
|
|
o, err := f.NewObject(ctx, remote)
|
|
if err != nil {
|
|
// assume it is a directory
|
|
return do(ctx, remote, duration, unlink)
|
|
}
|
|
return do(ctx, o.(*Object).Object.Remote(), duration, unlink)
|
|
}
|
|
|
|
/*** OBJECT FUNCTIONS ***/
|
|
|
|
// ObjectMetadata describes the metadata for an Object.
|
|
type ObjectMetadata struct {
|
|
Mode int // Compression mode of the file.
|
|
Size int64 // Size of the object.
|
|
MD5 string // MD5 hash of the file.
|
|
MimeType string // Mime type of the file
|
|
CompressionMetadata sgzip.GzipMetadata
|
|
}
|
|
|
|
// Object with external metadata
|
|
type Object struct {
|
|
fs.Object // Wraps around data object for this object
|
|
f *Fs // Filesystem object is in
|
|
mo fs.Object // Metadata object for this object
|
|
moName string // Metadata file name for this object
|
|
size int64 // Size of this object
|
|
meta *ObjectMetadata // Metadata struct for this object (nil if not loaded)
|
|
}
|
|
|
|
// This function generates a metadata object
|
|
func newMetadata(size int64, mode int, cmeta sgzip.GzipMetadata, md5 string, mimeType string) *ObjectMetadata {
|
|
meta := new(ObjectMetadata)
|
|
meta.Size = size
|
|
meta.Mode = mode
|
|
meta.CompressionMetadata = cmeta
|
|
meta.MD5 = md5
|
|
meta.MimeType = mimeType
|
|
return meta
|
|
}
|
|
|
|
// This function will read the metadata from a metadata object.
|
|
func readMetadata(ctx context.Context, mo fs.Object) (meta *ObjectMetadata, err error) {
|
|
// Open our meradata object
|
|
rc, err := mo.Open(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer fs.CheckClose(rc, &err)
|
|
jr := json.NewDecoder(rc)
|
|
meta = new(ObjectMetadata)
|
|
if err = jr.Decode(meta); err != nil {
|
|
return nil, err
|
|
}
|
|
return meta, nil
|
|
}
|
|
|
|
// Remove removes this object
|
|
func (o *Object) Remove(ctx context.Context) error {
|
|
err := o.loadMetadataObjectIfNotLoaded(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = o.mo.Remove(ctx)
|
|
objErr := o.Object.Remove(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return objErr
|
|
}
|
|
|
|
// ReadCloserWrapper combines a Reader and a Closer to a ReadCloser
|
|
type ReadCloserWrapper struct {
|
|
io.Reader
|
|
io.Closer
|
|
}
|
|
|
|
// Update in to the object with the modTime given of the given size
|
|
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
|
|
err = o.loadMetadataIfNotLoaded(ctx) // Loads metadata object too
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// Function that updates metadata object
|
|
updateMeta := func(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
|
return o.mo, o.mo.Update(ctx, in, src, options...)
|
|
}
|
|
|
|
in, compressible, mimeType, err := checkCompressAndType(in)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Since we are storing the filesize in the name the new object may have different name than the old
|
|
// We'll make sure to delete the old object in this case
|
|
var newObject *Object
|
|
origName := o.Remote()
|
|
if o.meta.Mode != Uncompressed || compressible {
|
|
newObject, err = o.f.putWithCustomFunctions(ctx, in, o.f.wrapInfo(src, origName, src.Size()), options, o.f.Fs.Put, updateMeta, compressible, mimeType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if newObject.Object.Remote() != o.Object.Remote() {
|
|
if removeErr := o.Object.Remove(ctx); removeErr != nil {
|
|
return removeErr
|
|
}
|
|
}
|
|
} else {
|
|
// We can only support update when BOTH the old and the new object are uncompressed because only then
|
|
// the filesize will be known beforehand and name will stay the same
|
|
update := func(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
|
return o.Object, o.Object.Update(ctx, in, src, options...)
|
|
}
|
|
// If we are, just update the object and metadata
|
|
newObject, err = o.f.putWithCustomFunctions(ctx, in, src, options, update, updateMeta, compressible, mimeType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
// Update object metadata and return
|
|
o.Object = newObject.Object
|
|
o.meta = newObject.meta
|
|
o.size = newObject.size
|
|
return nil
|
|
}
|
|
|
|
// This will initialize the variables of a new press Object. The metadata object, mo, and metadata struct, meta, must be specified.
|
|
func (f *Fs) newObject(o fs.Object, mo fs.Object, meta *ObjectMetadata) *Object {
|
|
if o == nil {
|
|
log.Trace(nil, "newObject(%#v, %#v, %#v) called with nil o", o, mo, meta)
|
|
}
|
|
return &Object{
|
|
Object: o,
|
|
f: f,
|
|
mo: mo,
|
|
moName: mo.Remote(),
|
|
size: meta.Size,
|
|
meta: meta,
|
|
}
|
|
}
|
|
|
|
// This initializes the variables of a press Object with only the size. The metadata will be loaded later on demand.
|
|
func (f *Fs) newObjectSizeAndNameOnly(o fs.Object, moName string, size int64) *Object {
|
|
if o == nil {
|
|
log.Trace(nil, "newObjectSizeAndNameOnly(%#v, %#v, %#v) called with nil o", o, moName, size)
|
|
}
|
|
return &Object{
|
|
Object: o,
|
|
f: f,
|
|
mo: nil,
|
|
moName: moName,
|
|
size: size,
|
|
meta: nil,
|
|
}
|
|
}
|
|
|
|
// Shutdown the backend, closing any background tasks and any
|
|
// cached connections.
|
|
func (f *Fs) Shutdown(ctx context.Context) error {
|
|
do := f.Fs.Features().Shutdown
|
|
if do == nil {
|
|
return nil
|
|
}
|
|
return do(ctx)
|
|
}
|
|
|
|
// This loads the metadata of a press Object if it's not loaded yet
|
|
func (o *Object) loadMetadataIfNotLoaded(ctx context.Context) (err error) {
|
|
err = o.loadMetadataObjectIfNotLoaded(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if o.meta == nil {
|
|
o.meta, err = readMetadata(ctx, o.mo)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// This loads the metadata object of a press Object if it's not loaded yet
|
|
func (o *Object) loadMetadataObjectIfNotLoaded(ctx context.Context) (err error) {
|
|
if o.mo == nil {
|
|
o.mo, err = o.f.Fs.NewObject(ctx, o.moName)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Fs returns read only access to the Fs that this object is part of
|
|
func (o *Object) Fs() fs.Info {
|
|
return o.f
|
|
}
|
|
|
|
// Return a string version
|
|
func (o *Object) String() string {
|
|
if o == nil {
|
|
return "<nil>"
|
|
}
|
|
return o.Remote()
|
|
}
|
|
|
|
// Remote returns the remote path
|
|
func (o *Object) Remote() string {
|
|
origFileName, _, _, err := processFileName(o.Object.Remote())
|
|
if err != nil {
|
|
fs.Errorf(o.f, "Could not get remote path for: %s", o.Object.Remote())
|
|
return o.Object.Remote()
|
|
}
|
|
return origFileName
|
|
}
|
|
|
|
// Size returns the size of the file
|
|
func (o *Object) Size() int64 {
|
|
if o.meta == nil {
|
|
return o.size
|
|
}
|
|
return o.meta.Size
|
|
}
|
|
|
|
// MimeType returns the MIME type of the file
|
|
func (o *Object) MimeType(ctx context.Context) string {
|
|
err := o.loadMetadataIfNotLoaded(ctx)
|
|
if err != nil {
|
|
return "error/error"
|
|
}
|
|
return o.meta.MimeType
|
|
}
|
|
|
|
// Metadata returns metadata for an object
|
|
//
|
|
// It should return nil if there is no Metadata
|
|
func (o *Object) Metadata(ctx context.Context) (fs.Metadata, error) {
|
|
err := o.loadMetadataIfNotLoaded(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
do, ok := o.mo.(fs.Metadataer)
|
|
if !ok {
|
|
return nil, nil
|
|
}
|
|
return do.Metadata(ctx)
|
|
}
|
|
|
|
// SetMetadata sets metadata for an Object
|
|
//
|
|
// It should return fs.ErrorNotImplemented if it can't set metadata
|
|
func (o *Object) SetMetadata(ctx context.Context, metadata fs.Metadata) error {
|
|
do, ok := o.Object.(fs.SetMetadataer)
|
|
if !ok {
|
|
return fs.ErrorNotImplemented
|
|
}
|
|
return do.SetMetadata(ctx, metadata)
|
|
}
|
|
|
|
// Hash returns the selected checksum of the file
|
|
// If no checksum is available it returns ""
|
|
func (o *Object) Hash(ctx context.Context, ht hash.Type) (string, error) {
|
|
if ht != hash.MD5 {
|
|
return "", hash.ErrUnsupported
|
|
}
|
|
err := o.loadMetadataIfNotLoaded(ctx)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return o.meta.MD5, nil
|
|
}
|
|
|
|
// SetTier performs changing storage tier of the Object if
|
|
// multiple storage classes supported
|
|
func (o *Object) SetTier(tier string) error {
|
|
do, ok := o.Object.(fs.SetTierer)
|
|
mdo, mok := o.mo.(fs.SetTierer)
|
|
if !(ok && mok) {
|
|
return errors.New("press: underlying remote does not support SetTier")
|
|
}
|
|
if err := mdo.SetTier(tier); err != nil {
|
|
return err
|
|
}
|
|
return do.SetTier(tier)
|
|
}
|
|
|
|
// GetTier returns storage tier or class of the Object
|
|
func (o *Object) GetTier() string {
|
|
do, ok := o.mo.(fs.GetTierer)
|
|
if !ok {
|
|
return ""
|
|
}
|
|
return do.GetTier()
|
|
}
|
|
|
|
// UnWrap returns the wrapped Object
|
|
func (o *Object) UnWrap() fs.Object {
|
|
return o.Object
|
|
}
|
|
|
|
// Open opens the file for read. Call Close() on the returned io.ReadCloser. Note that this call requires quite a bit of overhead.
|
|
func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (rc io.ReadCloser, err error) {
|
|
err = o.loadMetadataIfNotLoaded(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// If we're uncompressed, just pass this to the underlying object
|
|
if o.meta.Mode == Uncompressed {
|
|
return o.Object.Open(ctx, options...)
|
|
}
|
|
// Get offset and limit from OpenOptions, pass the rest to the underlying remote
|
|
var openOptions = []fs.OpenOption{&fs.SeekOption{Offset: 0}}
|
|
var offset, limit int64 = 0, -1
|
|
for _, option := range options {
|
|
switch x := option.(type) {
|
|
case *fs.SeekOption:
|
|
offset = x.Offset
|
|
case *fs.RangeOption:
|
|
offset, limit = x.Decode(o.Size())
|
|
default:
|
|
openOptions = append(openOptions, option)
|
|
}
|
|
}
|
|
// Get a chunkedreader for the wrapped object
|
|
chunkedReader := chunkedreader.New(ctx, o.Object, initialChunkSize, maxChunkSize)
|
|
// Get file handle
|
|
var file io.Reader
|
|
if offset != 0 {
|
|
file, err = sgzip.NewReaderAt(chunkedReader, &o.meta.CompressionMetadata, offset)
|
|
} else {
|
|
file, err = sgzip.NewReader(chunkedReader)
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var fileReader io.Reader
|
|
if limit != -1 {
|
|
fileReader = io.LimitReader(file, limit)
|
|
} else {
|
|
fileReader = file
|
|
}
|
|
// Return a ReadCloser
|
|
return ReadCloserWrapper{Reader: fileReader, Closer: chunkedReader}, nil
|
|
}
|
|
|
|
// ObjectInfo describes a wrapped fs.ObjectInfo for being the source
|
|
type ObjectInfo struct {
|
|
src fs.ObjectInfo
|
|
fs *Fs
|
|
remote string
|
|
size int64
|
|
}
|
|
|
|
func (f *Fs) wrapInfo(src fs.ObjectInfo, newRemote string, size int64) *ObjectInfo {
|
|
return &ObjectInfo{
|
|
src: src,
|
|
fs: f,
|
|
remote: newRemote,
|
|
size: size,
|
|
}
|
|
}
|
|
|
|
// Fs returns read only access to the Fs that this object is part of
|
|
func (o *ObjectInfo) Fs() fs.Info {
|
|
if o.fs == nil {
|
|
panic("stub ObjectInfo")
|
|
}
|
|
return o.fs
|
|
}
|
|
|
|
// String returns string representation
|
|
func (o *ObjectInfo) String() string {
|
|
return o.src.String()
|
|
}
|
|
|
|
// Storable returns whether object is storable
|
|
func (o *ObjectInfo) Storable() bool {
|
|
return o.src.Storable()
|
|
}
|
|
|
|
// Remote returns the remote path
|
|
func (o *ObjectInfo) Remote() string {
|
|
if o.remote != "" {
|
|
return o.remote
|
|
}
|
|
return o.src.Remote()
|
|
}
|
|
|
|
// Size returns the size of the file
|
|
func (o *ObjectInfo) Size() int64 {
|
|
return o.size
|
|
}
|
|
|
|
// ModTime returns the modification time
|
|
func (o *ObjectInfo) ModTime(ctx context.Context) time.Time {
|
|
return o.src.ModTime(ctx)
|
|
}
|
|
|
|
// Hash returns the selected checksum of the file
|
|
// If no checksum is available it returns ""
|
|
func (o *ObjectInfo) Hash(ctx context.Context, ht hash.Type) (string, error) {
|
|
return "", nil // cannot know the checksum
|
|
}
|
|
|
|
// ID returns the ID of the Object if known, or "" if not
|
|
func (o *ObjectInfo) ID() string {
|
|
do, ok := o.src.(fs.IDer)
|
|
if !ok {
|
|
return ""
|
|
}
|
|
return do.ID()
|
|
}
|
|
|
|
// MimeType returns the content type of the Object if
|
|
// known, or "" if not
|
|
func (o *ObjectInfo) MimeType(ctx context.Context) string {
|
|
do, ok := o.src.(fs.MimeTyper)
|
|
if !ok {
|
|
return ""
|
|
}
|
|
return do.MimeType(ctx)
|
|
}
|
|
|
|
// UnWrap returns the Object that this Object is wrapping or
|
|
// nil if it isn't wrapping anything
|
|
func (o *ObjectInfo) UnWrap() fs.Object {
|
|
return fs.UnWrapObjectInfo(o.src)
|
|
}
|
|
|
|
// Metadata returns metadata for an object
|
|
//
|
|
// It should return nil if there is no Metadata
|
|
func (o *ObjectInfo) Metadata(ctx context.Context) (fs.Metadata, error) {
|
|
do, ok := o.src.(fs.Metadataer)
|
|
if !ok {
|
|
return nil, nil
|
|
}
|
|
return do.Metadata(ctx)
|
|
}
|
|
|
|
// GetTier returns storage tier or class of the Object
|
|
func (o *ObjectInfo) GetTier() string {
|
|
do, ok := o.src.(fs.GetTierer)
|
|
if !ok {
|
|
return ""
|
|
}
|
|
return do.GetTier()
|
|
}
|
|
|
|
// ID returns the ID of the Object if known, or "" if not
|
|
func (o *Object) ID() string {
|
|
do, ok := o.Object.(fs.IDer)
|
|
if !ok {
|
|
return ""
|
|
}
|
|
return do.ID()
|
|
}
|
|
|
|
// Name of the remote (as passed into NewFs)
|
|
func (f *Fs) Name() string {
|
|
return f.name
|
|
}
|
|
|
|
// Root of the remote (as passed into NewFs)
|
|
func (f *Fs) Root() string {
|
|
return f.root
|
|
}
|
|
|
|
// Features returns the optional features of this Fs
|
|
func (f *Fs) Features() *fs.Features {
|
|
return f.features
|
|
}
|
|
|
|
// Return a string version
|
|
func (f *Fs) String() string {
|
|
return fmt.Sprintf("Compressed: %s:%s", f.name, f.root)
|
|
}
|
|
|
|
// Precision returns the precision of this Fs
|
|
func (f *Fs) Precision() time.Duration {
|
|
return f.Fs.Precision()
|
|
}
|
|
|
|
// Check the interfaces are satisfied
|
|
var (
|
|
_ fs.Fs = (*Fs)(nil)
|
|
_ fs.Purger = (*Fs)(nil)
|
|
_ fs.Copier = (*Fs)(nil)
|
|
_ fs.Mover = (*Fs)(nil)
|
|
_ fs.DirMover = (*Fs)(nil)
|
|
_ fs.DirSetModTimer = (*Fs)(nil)
|
|
_ fs.MkdirMetadataer = (*Fs)(nil)
|
|
_ fs.PutStreamer = (*Fs)(nil)
|
|
_ fs.CleanUpper = (*Fs)(nil)
|
|
_ fs.UnWrapper = (*Fs)(nil)
|
|
_ fs.ListRer = (*Fs)(nil)
|
|
_ fs.Abouter = (*Fs)(nil)
|
|
_ fs.Wrapper = (*Fs)(nil)
|
|
_ fs.MergeDirser = (*Fs)(nil)
|
|
_ fs.DirCacheFlusher = (*Fs)(nil)
|
|
_ fs.ChangeNotifier = (*Fs)(nil)
|
|
_ fs.PublicLinker = (*Fs)(nil)
|
|
_ fs.Shutdowner = (*Fs)(nil)
|
|
_ fs.FullObjectInfo = (*ObjectInfo)(nil)
|
|
_ fs.FullObject = (*Object)(nil)
|
|
)
|