// Package chunker provides wrappers for Fs and Object which split large files in chunks package chunker import ( "bytes" "context" "crypto/md5" "crypto/sha1" "encoding/hex" "encoding/json" "fmt" gohash "hash" "io" "io/ioutil" "math/rand" "path" "regexp" "sort" "strconv" "strings" "sync" "time" "github.com/pkg/errors" "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/accounting" "github.com/rclone/rclone/fs/cache" "github.com/rclone/rclone/fs/config/configmap" "github.com/rclone/rclone/fs/config/configstruct" "github.com/rclone/rclone/fs/fspath" "github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/operations" ) // // Chunker's composite files have one or more chunks // and optional metadata object. If it's present, // meta object is named after the original file. // // The only supported metadata format is simplejson atm. // It supports only per-file meta objects that are rudimentary, // used mostly for consistency checks (lazily for performance reasons). // Other formats can be developed that use an external meta store // free of these limitations, but this needs some support from // rclone core (e.g. metadata store interfaces). // // The following types of chunks are supported: // data and control, active and temporary. // Chunk type is identified by matching chunk file name // based on the chunk name format configured by user and transaction // style being used. // // Both data and control chunks can be either temporary (aka hidden) // or active (non-temporary aka normal aka permanent). // An operation creates temporary chunks while it runs. // By completion it removes temporary and leaves active chunks. // // Temporary chunks have a special hardcoded suffix in addition // to the configured name pattern. // Temporary suffix includes so called transaction identifier // (abbreviated as `xactID` below), a generic non-negative base-36 "number" // used by parallel operations to share a composite object. // Chunker also accepts the longer decimal temporary suffix (obsolete), // which is transparently converted to the new format. In its maximum // length of 13 decimals it makes a 7-digit base-36 number. // // When transactions is set to the norename style, data chunks will // keep their temporary chunk names (with the transacion identifier // suffix). To distinguish them from temporary chunks, the txn field // of the metadata file is set to match the transaction identifier of // the data chunks. // // Chunker can tell data chunks from control chunks by the characters // located in the "hash placeholder" position of configured format. // Data chunks have decimal digits there. // Control chunks have in that position a short lowercase alphanumeric // string (starting with a letter) prepended by underscore. // // Metadata format v1 does not define any control chunk types, // they are currently ignored aka reserved. // In future they can be used to implement resumable uploads etc. // const ( ctrlTypeRegStr = `[a-z][a-z0-9]{2,6}` tempSuffixFormat = `_%04s` tempSuffixRegStr = `_([0-9a-z]{4,9})` tempSuffixRegOld = `\.\.tmp_([0-9]{10,13})` ) var ( // regular expressions to validate control type and temporary suffix ctrlTypeRegexp = regexp.MustCompile(`^` + ctrlTypeRegStr + `$`) tempSuffixRegexp = regexp.MustCompile(`^` + tempSuffixRegStr + `$`) ) // Normally metadata is a small piece of JSON (about 100-300 bytes). // The size of valid metadata must never exceed this limit. // Current maximum provides a reasonable room for future extensions. // // Please refrain from increasing it, this can cause old rclone versions // to fail, or worse, treat meta object as a normal file (see NewObject). // If more room is needed please bump metadata version forcing previous // releases to ask for upgrade, and offload extra info to a control chunk. // // And still chunker's primary function is to chunk large files // rather than serve as a generic metadata container. const maxMetadataSize = 1023 const maxMetadataSizeWritten = 255 // Current/highest supported metadata format. const metadataVersion = 2 // optimizeFirstChunk enables the following optimization in the Put: // If a single chunk is expected, put the first chunk using the // base target name instead of a temporary name, thus avoiding // extra rename operation. // Warning: this optimization is not transaction safe. const optimizeFirstChunk = false // revealHidden is a stub until chunker lands the `reveal hidden` option. const revealHidden = false // Prevent memory overflow due to specially crafted chunk name const maxSafeChunkNumber = 10000000 // Number of attempts to find unique transaction identifier const maxTransactionProbes = 100 // standard chunker errors var ( ErrChunkOverflow = errors.New("chunk number overflow") ErrMetaTooBig = errors.New("metadata is too big") ErrMetaUnknown = errors.New("unknown metadata, please upgrade rclone") ) // variants of baseMove's parameter delMode const ( delNever = 0 // don't delete, just move delAlways = 1 // delete destination before moving delFailed = 2 // move, then delete and try again if failed ) // Register with Fs func init() { fs.Register(&fs.RegInfo{ Name: "chunker", Description: "Transparently chunk/split large files", NewFs: NewFs, Options: []fs.Option{{ Name: "remote", Required: true, Help: `Remote to chunk/unchunk. Normally should contain a ':' and a path, e.g. "myremote:path/to/dir", "myremote:bucket" or maybe "myremote:" (not recommended).`, }, { Name: "chunk_size", Advanced: false, Default: fs.SizeSuffix(2147483648), // 2 GiB Help: `Files larger than chunk size will be split in chunks.`, }, { Name: "name_format", Advanced: true, Hide: fs.OptionHideCommandLine, Default: `*.rclone_chunk.###`, Help: `String format of chunk file names. The two placeholders are: base file name (*) and chunk number (#...). There must be one and only one asterisk and one or more consecutive hash characters. If chunk number has less digits than the number of hashes, it is left-padded by zeros. If there are more digits in the number, they are left as is. Possible chunk files are ignored if their name does not match given format.`, }, { Name: "start_from", Advanced: true, Hide: fs.OptionHideCommandLine, Default: 1, Help: `Minimum valid chunk number. Usually 0 or 1. By default chunk numbers start from 1.`, }, { Name: "meta_format", Advanced: true, Hide: fs.OptionHideCommandLine, Default: "simplejson", Help: `Format of the metadata object or "none". By default "simplejson". Metadata is a small JSON file named after the composite file.`, Examples: []fs.OptionExample{{ Value: "none", Help: `Do not use metadata files at all. Requires hash type "none".`, }, { Value: "simplejson", Help: `Simple JSON supports hash sums and chunk validation. It has the following fields: ver, size, nchunks, md5, sha1.`, }}, }, { Name: "hash_type", Advanced: false, Default: "md5", Help: `Choose how chunker handles hash sums. All modes but "none" require metadata.`, Examples: []fs.OptionExample{{ Value: "none", Help: `Pass any hash supported by wrapped remote for non-chunked files. Return nothing otherwise.`, }, { Value: "md5", Help: `MD5 for composite files.`, }, { Value: "sha1", Help: `SHA1 for composite files.`, }, { Value: "md5all", Help: `MD5 for all files.`, }, { Value: "sha1all", Help: `SHA1 for all files.`, }, { Value: "md5quick", Help: `Copying a file to chunker will request MD5 from the source. Falling back to SHA1 if unsupported.`, }, { Value: "sha1quick", Help: `Similar to "md5quick" but prefers SHA1 over MD5.`, }}, }, { Name: "fail_hard", Advanced: true, Default: false, Help: `Choose how chunker should handle files with missing or invalid chunks.`, Examples: []fs.OptionExample{ { Value: "true", Help: "Report errors and abort current command.", }, { Value: "false", Help: "Warn user, skip incomplete file and proceed.", }, }, }, { Name: "transactions", Advanced: true, Default: "rename", Help: `Choose how chunker should handle temporary files during transactions.`, Hide: fs.OptionHideCommandLine, Examples: []fs.OptionExample{ { Value: "rename", Help: "Rename temporary files after a successful transaction.", }, { Value: "norename", Help: `Leave temporary file names and write transaction ID to metadata file. Metadata is required for no rename transactions (meta format cannot be "none"). If you are using norename transactions you should be careful not to downgrade Rclone as older versions of Rclone don't support this transaction style and will misinterpret files manipulated by norename transactions. This method is EXPERIMENTAL, don't use on production systems.`, }, { Value: "auto", Help: `Rename or norename will be used depending on capabilities of the backend. If meta format is set to "none", rename transactions will always be used. This method is EXPERIMENTAL, don't use on production systems.`, }, }, }}, }) } // NewFs constructs an Fs from the path, container:path func NewFs(ctx context.Context, name, rpath string, m configmap.Mapper) (fs.Fs, error) { // Parse config into Options struct opt := new(Options) err := configstruct.Set(m, opt) if err != nil { return nil, err } if opt.StartFrom < 0 { return nil, errors.New("start_from must be non-negative") } remote := opt.Remote if strings.HasPrefix(remote, name+":") { return nil, errors.New("can't point remote at itself - check the value of the remote setting") } baseName, basePath, err := fspath.SplitFs(remote) if err != nil { return nil, errors.Wrapf(err, "failed to parse remote %q to wrap", remote) } // Look for a file first remotePath := fspath.JoinRootPath(basePath, rpath) baseFs, err := cache.Get(ctx, baseName+remotePath) if err != fs.ErrorIsFile && err != nil { return nil, errors.Wrapf(err, "failed to make remote %q to wrap", baseName+remotePath) } if !operations.CanServerSideMove(baseFs) { return nil, errors.New("can't use chunker on a backend which doesn't support server-side move or copy") } f := &Fs{ base: baseFs, name: name, root: rpath, opt: *opt, } cache.PinUntilFinalized(f.base, f) f.dirSort = true // processEntries requires that meta Objects prerun data chunks atm. if err := f.configure(opt.NameFormat, opt.MetaFormat, opt.HashType, opt.Transactions); err != nil { return nil, err } // Handle the tricky case detected by FsMkdir/FsPutFiles/FsIsFile // when `rpath` points to a composite multi-chunk file without metadata, // i.e. `rpath` does not exist in the wrapped remote, but chunker // detects a composite file because it finds the first chunk! // (yet can't satisfy fstest.CheckListing, will ignore) if err == nil && !f.useMeta && strings.Contains(rpath, "/") { firstChunkPath := f.makeChunkName(remotePath, 0, "", "") _, testErr := cache.Get(ctx, baseName+firstChunkPath) if testErr == fs.ErrorIsFile { err = testErr } } // Note 1: the features here are ones we could support, and they are // ANDed with the ones from wrappedFs. // Note 2: features.Fill() points features.PutStream to our PutStream, // but features.Mask() will nullify it if wrappedFs does not have it. f.features = (&fs.Features{ CaseInsensitive: true, DuplicateFiles: true, ReadMimeType: false, // Object.MimeType not supported WriteMimeType: true, BucketBased: true, CanHaveEmptyDirectories: true, ServerSideAcrossConfigs: true, }).Fill(ctx, f).Mask(ctx, baseFs).WrapsFs(f, baseFs) f.features.Disable("ListR") // Recursive listing may cause chunker skip files return f, err } // Options defines the configuration for this backend type Options struct { Remote string `config:"remote"` ChunkSize fs.SizeSuffix `config:"chunk_size"` NameFormat string `config:"name_format"` StartFrom int `config:"start_from"` MetaFormat string `config:"meta_format"` HashType string `config:"hash_type"` FailHard bool `config:"fail_hard"` Transactions string `config:"transactions"` } // Fs represents a wrapped fs.Fs type Fs struct { name string root string base fs.Fs // remote wrapped by chunker overlay wrapper fs.Fs // wrapper is used by SetWrapper useMeta bool // false if metadata format is 'none' useMD5 bool // mutually exclusive with useSHA1 useSHA1 bool // mutually exclusive with useMD5 hashFallback bool // allows fallback from MD5 to SHA1 and vice versa hashAll bool // hash all files, mutually exclusive with hashFallback dataNameFmt string // name format of data chunks ctrlNameFmt string // name format of control chunks nameRegexp *regexp.Regexp // regular expression to match chunk names xactIDRand *rand.Rand // generator of random transaction identifiers xactIDMutex sync.Mutex // mutex for the source of randomness opt Options // copy of Options features *fs.Features // optional features dirSort bool // reserved for future, ignored useNoRename bool // can be set with the transactions option } // configure sets up chunker for given name format, meta format and hash type. // It also seeds the source of random transaction identifiers. // configure must be called only from NewFs or by unit tests. func (f *Fs) configure(nameFormat, metaFormat, hashType, transactionMode string) error { if err := f.setChunkNameFormat(nameFormat); err != nil { return errors.Wrapf(err, "invalid name format '%s'", nameFormat) } if err := f.setMetaFormat(metaFormat); err != nil { return err } if err := f.setHashType(hashType); err != nil { return err } if err := f.setTransactionMode(transactionMode); err != nil { return err } randomSeed := time.Now().UnixNano() f.xactIDRand = rand.New(rand.NewSource(randomSeed)) return nil } func (f *Fs) setMetaFormat(metaFormat string) error { switch metaFormat { case "none": f.useMeta = false case "simplejson": f.useMeta = true default: return fmt.Errorf("unsupported meta format '%s'", metaFormat) } return nil } // setHashType // must be called *after* setMetaFormat. // // In the "All" mode chunker will force metadata on all files // if the wrapped remote can't provide given hashsum. func (f *Fs) setHashType(hashType string) error { f.useMD5 = false f.useSHA1 = false f.hashFallback = false f.hashAll = false requireMetaHash := true switch hashType { case "none": requireMetaHash = false case "md5": f.useMD5 = true case "sha1": f.useSHA1 = true case "md5quick": f.useMD5 = true f.hashFallback = true case "sha1quick": f.useSHA1 = true f.hashFallback = true case "md5all": f.useMD5 = true f.hashAll = !f.base.Hashes().Contains(hash.MD5) || f.base.Features().SlowHash case "sha1all": f.useSHA1 = true f.hashAll = !f.base.Hashes().Contains(hash.SHA1) || f.base.Features().SlowHash default: return fmt.Errorf("unsupported hash type '%s'", hashType) } if requireMetaHash && !f.useMeta { return fmt.Errorf("hash type '%s' requires compatible meta format", hashType) } return nil } func (f *Fs) setTransactionMode(transactionMode string) error { switch transactionMode { case "rename": f.useNoRename = false case "norename": if !f.useMeta { return errors.New("incompatible transaction options") } f.useNoRename = true case "auto": f.useNoRename = !f.CanQuickRename() if f.useNoRename && !f.useMeta { f.useNoRename = false return errors.New("using norename transactions requires metadata") } default: return fmt.Errorf("unsupported transaction mode '%s'", transactionMode) } return nil } // setChunkNameFormat converts pattern based chunk name format // into Printf format and Regular expressions for data and // control chunks. func (f *Fs) setChunkNameFormat(pattern string) error { // validate pattern if strings.Count(pattern, "*") != 1 { return errors.New("pattern must have exactly one asterisk (*)") } numDigits := strings.Count(pattern, "#") if numDigits < 1 { return errors.New("pattern must have a hash character (#)") } if strings.Index(pattern, "*") > strings.Index(pattern, "#") { return errors.New("asterisk (*) in pattern must come before hashes (#)") } if ok, _ := regexp.MatchString("^[^#]*[#]+[^#]*$", pattern); !ok { return errors.New("hashes (#) in pattern must be consecutive") } if dir, _ := path.Split(pattern); dir != "" { return errors.New("directory separator prohibited") } if pattern[0] != '*' { return errors.New("pattern must start with asterisk") // to be lifted later } // craft a unified regular expression for all types of chunks reHashes := regexp.MustCompile("[#]+") reDigits := "[0-9]+" if numDigits > 1 { reDigits = fmt.Sprintf("[0-9]{%d,}", numDigits) } reDataOrCtrl := fmt.Sprintf("(?:(%s)|_(%s))", reDigits, ctrlTypeRegStr) // this must be non-greedy or else it could eat up temporary suffix const mainNameRegStr = "(.+?)" strRegex := regexp.QuoteMeta(pattern) strRegex = reHashes.ReplaceAllLiteralString(strRegex, reDataOrCtrl) strRegex = strings.Replace(strRegex, "\\*", mainNameRegStr, -1) strRegex = fmt.Sprintf("^%s(?:%s|%s)?$", strRegex, tempSuffixRegStr, tempSuffixRegOld) f.nameRegexp = regexp.MustCompile(strRegex) // craft printf formats for active data/control chunks fmtDigits := "%d" if numDigits > 1 { fmtDigits = fmt.Sprintf("%%0%dd", numDigits) } strFmt := strings.Replace(pattern, "%", "%%", -1) strFmt = strings.Replace(strFmt, "*", "%s", 1) f.dataNameFmt = reHashes.ReplaceAllLiteralString(strFmt, fmtDigits) f.ctrlNameFmt = reHashes.ReplaceAllLiteralString(strFmt, "_%s") return nil } // makeChunkName produces chunk name (or path) for a given file. // // filePath can be name, relative or absolute path of main file. // // chunkNo must be a zero based index of data chunk. // Negative chunkNo e.g. -1 indicates a control chunk. // ctrlType is type of control chunk (must be valid). // ctrlType must be "" for data chunks. // // xactID is a transaction identifier. Empty xactID denotes active chunk, // otherwise temporary chunk name is produced. // func (f *Fs) makeChunkName(filePath string, chunkNo int, ctrlType, xactID string) string { dir, parentName := path.Split(filePath) var name, tempSuffix string switch { case chunkNo >= 0 && ctrlType == "": name = fmt.Sprintf(f.dataNameFmt, parentName, chunkNo+f.opt.StartFrom) case chunkNo < 0 && ctrlTypeRegexp.MatchString(ctrlType): name = fmt.Sprintf(f.ctrlNameFmt, parentName, ctrlType) default: panic("makeChunkName: invalid argument") // must not produce something we can't consume } if xactID != "" { tempSuffix = fmt.Sprintf(tempSuffixFormat, xactID) if !tempSuffixRegexp.MatchString(tempSuffix) { panic("makeChunkName: invalid argument") } } return dir + name + tempSuffix } // parseChunkName checks whether given file path belongs to // a chunk and extracts chunk name parts. // // filePath can be name, relative or absolute path of a file. // // Returned parentPath is path of the composite file owning the chunk. // It's a non-empty string if valid chunk name is detected // or "" if it's not a chunk. // Other returned values depend on detected chunk type: // data or control, active or temporary: // // data chunk - the returned chunkNo is non-negative and ctrlType is "" // control chunk - the chunkNo is -1 and ctrlType is a non-empty string // active chunk - the returned xactID is "" // temporary chunk - the xactID is a non-empty string func (f *Fs) parseChunkName(filePath string) (parentPath string, chunkNo int, ctrlType, xactID string) { dir, name := path.Split(filePath) match := f.nameRegexp.FindStringSubmatch(name) if match == nil || match[1] == "" { return "", -1, "", "" } var err error chunkNo = -1 if match[2] != "" { if chunkNo, err = strconv.Atoi(match[2]); err != nil { chunkNo = -1 } if chunkNo -= f.opt.StartFrom; chunkNo < 0 { fs.Infof(f, "invalid data chunk number in file %q", name) return "", -1, "", "" } } if match[4] != "" { xactID = match[4] } if match[5] != "" { // old-style temporary suffix number, err := strconv.ParseInt(match[5], 10, 64) if err != nil || number < 0 { fs.Infof(f, "invalid old-style transaction number in file %q", name) return "", -1, "", "" } // convert old-style transaction number to base-36 transaction ID xactID = fmt.Sprintf(tempSuffixFormat, strconv.FormatInt(number, 36)) xactID = xactID[1:] // strip leading underscore } parentPath = dir + match[1] ctrlType = match[3] return } // forbidChunk prints error message or raises error if file is chunk. // First argument sets log prefix, use `false` to suppress message. func (f *Fs) forbidChunk(o interface{}, filePath string) error { if parentPath, _, _, _ := f.parseChunkName(filePath); parentPath != "" { if f.opt.FailHard { return fmt.Errorf("chunk overlap with %q", parentPath) } if boolVal, isBool := o.(bool); !isBool || boolVal { fs.Errorf(o, "chunk overlap with %q", parentPath) } } return nil } // newXactID produces a sufficiently random transaction identifier. // // The temporary suffix mask allows identifiers consisting of 4-9 // base-36 digits (ie. digits 0-9 or lowercase letters a-z). // The identifiers must be unique between transactions running on // the single file in parallel. // // Currently the function produces 6-character identifiers. // Together with underscore this makes a 7-character temporary suffix. // // The first 4 characters isolate groups of transactions by time intervals. // The maximum length of interval is base-36 "zzzz" ie. 1,679,615 seconds. // The function rather takes a maximum prime closest to this number // (see https://primes.utm.edu) as the interval length to better safeguard // against repeating pseudo-random sequences in cases when rclone is // invoked from a periodic scheduler like unix cron. // Thus, the interval is slightly more than 19 days 10 hours 33 minutes. // // The remaining 2 base-36 digits (in the range from 0 to 1295 inclusive) // are taken from the local random source. // This provides about 0.1% collision probability for two parallel // operations started at the same second and working on the same file. // // Non-empty filePath argument enables probing for existing temporary chunk // to further eliminate collisions. func (f *Fs) newXactID(ctx context.Context, filePath string) (xactID string, err error) { const closestPrimeZzzzSeconds = 1679609 const maxTwoBase36Digits = 1295 unixSec := time.Now().Unix() if unixSec < 0 { unixSec = -unixSec // unlikely but the number must be positive } circleSec := unixSec % closestPrimeZzzzSeconds first4chars := strconv.FormatInt(circleSec, 36) for tries := 0; tries < maxTransactionProbes; tries++ { f.xactIDMutex.Lock() randomness := f.xactIDRand.Int63n(maxTwoBase36Digits + 1) f.xactIDMutex.Unlock() last2chars := strconv.FormatInt(randomness, 36) xactID = fmt.Sprintf("%04s%02s", first4chars, last2chars) if filePath == "" { return } probeChunk := f.makeChunkName(filePath, 0, "", xactID) _, probeErr := f.base.NewObject(ctx, probeChunk) if probeErr != nil { return } } return "", fmt.Errorf("can't setup transaction for %s", filePath) } // List the objects and directories in dir into entries. // The entries can be returned in any order but should be // for a complete directory. // // dir should be "" to list the root, and should not have // trailing slashes. // // This should return ErrDirNotFound if the directory isn't found. // // Commands normally cleanup all temporary chunks in case of a failure. // However, if rclone dies unexpectedly, it can leave behind a bunch of // hidden temporary chunks. List and its underlying chunkEntries() // silently skip all temporary chunks in the directory. It's okay if // they belong to an unfinished command running in parallel. // // However, there is no way to discover dead temporary chunks atm. // As a workaround users can use `purge` to forcibly remove the whole // directory together with dead chunks. // In future a flag named like `--chunker-list-hidden` may be added to // rclone that will tell List to reveal hidden chunks. // func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { entries, err = f.base.List(ctx, dir) if err != nil { return nil, err } return f.processEntries(ctx, entries, dir) } // ListR lists the objects and directories of the Fs starting // from dir recursively into out. // // dir should be "" to start from the root, and should not // have trailing slashes. // // This should return ErrDirNotFound if the directory isn't // found. // // It should call callback for each tranche of entries read. // These need not be returned in any particular order. If // callback returns an error then the listing will stop // immediately. // // Don't implement this unless you have a more efficient way // of listing recursively than doing a directory traversal. func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) { do := f.base.Features().ListR return do(ctx, dir, func(entries fs.DirEntries) error { newEntries, err := f.processEntries(ctx, entries, dir) if err != nil { return err } return callback(newEntries) }) } // processEntries assembles chunk entries into composite entries func (f *Fs) processEntries(ctx context.Context, origEntries fs.DirEntries, dirPath string) (newEntries fs.DirEntries, err error) { var sortedEntries fs.DirEntries if f.dirSort { // sort entries so that meta objects go before their chunks sortedEntries = make(fs.DirEntries, len(origEntries)) copy(sortedEntries, origEntries) sort.Sort(sortedEntries) } else { sortedEntries = origEntries } byRemote := make(map[string]*Object) badEntry := make(map[string]bool) isSubdir := make(map[string]bool) txnByRemote := map[string]string{} var tempEntries fs.DirEntries for _, dirOrObject := range sortedEntries { switch entry := dirOrObject.(type) { case fs.Object: remote := entry.Remote() mainRemote, chunkNo, ctrlType, xactID := f.parseChunkName(remote) if mainRemote == "" { // this is meta object or standalone file object := f.newObject("", entry, nil) byRemote[remote] = object tempEntries = append(tempEntries, object) if f.useNoRename { txnByRemote[remote], err = object.readXactID(ctx) if err != nil { return nil, err } } break } // this is some kind of chunk // metobject should have been created above if present mainObject := byRemote[mainRemote] isSpecial := xactID != txnByRemote[mainRemote] || ctrlType != "" if mainObject == nil && f.useMeta && !isSpecial { fs.Debugf(f, "skip orphan data chunk %q", remote) break } if mainObject == nil && !f.useMeta { // this is the "nometa" case // create dummy chunked object without metadata mainObject = f.newObject(mainRemote, nil, nil) byRemote[mainRemote] = mainObject if !badEntry[mainRemote] { tempEntries = append(tempEntries, mainObject) } } if isSpecial { if revealHidden { fs.Infof(f, "ignore non-data chunk %q", remote) } // need to read metadata to ensure actual object type // no need to read if metaobject is too big or absent, // use the fact that before calling validate() // the `size` field caches metaobject size, if any if f.useMeta && mainObject != nil && mainObject.size <= maxMetadataSize { mainObject.unsure = true } break } if err := mainObject.addChunk(entry, chunkNo); err != nil { if f.opt.FailHard { return nil, err } badEntry[mainRemote] = true } case fs.Directory: isSubdir[entry.Remote()] = true wrapDir := fs.NewDirCopy(ctx, entry) wrapDir.SetRemote(entry.Remote()) tempEntries = append(tempEntries, wrapDir) default: if f.opt.FailHard { return nil, fmt.Errorf("unknown object type %T", entry) } fs.Debugf(f, "unknown object type %T", entry) } } for _, entry := range tempEntries { if object, ok := entry.(*Object); ok { remote := object.Remote() if isSubdir[remote] { if f.opt.FailHard { return nil, fmt.Errorf("%q is both meta object and directory", remote) } badEntry[remote] = true // fall thru } if badEntry[remote] { fs.Debugf(f, "invalid directory entry %q", remote) continue } if err := object.validate(); err != nil { if f.opt.FailHard { return nil, err } fs.Debugf(f, "invalid chunks in object %q", remote) continue } } newEntries = append(newEntries, entry) } if f.dirSort { sort.Sort(newEntries) } return newEntries, nil } // NewObject finds the Object at remote. // // Please note that every NewObject invocation will scan the whole directory. // Using here something like fs.DirCache might improve performance // (yet making the logic more complex). // // Note that chunker prefers analyzing file names rather than reading // the content of meta object assuming that directory scans are fast // but opening even a small file can be slow on some backends. // func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) { return f.scanObject(ctx, remote, false) } // scanObject is like NewObject with optional quick scan mode. // The quick mode avoids directory requests other than `List`, // ignores non-chunked objects and skips chunk size checks. func (f *Fs) scanObject(ctx context.Context, remote string, quickScan bool) (fs.Object, error) { if err := f.forbidChunk(false, remote); err != nil { return nil, errors.Wrap(err, "can't access") } var ( o *Object baseObj fs.Object currentXactID string err error sameMain bool ) if f.useMeta { baseObj, err = f.base.NewObject(ctx, remote) if err != nil { return nil, err } remote = baseObj.Remote() // Chunker's meta object cannot be large and maxMetadataSize acts // as a hard limit. Anything larger than that is treated as a // non-chunked file without even checking its contents, so it's // paramount to prevent metadata from exceeding the maximum size. // Anything smaller is additionally checked for format. o = f.newObject("", baseObj, nil) if o.size > maxMetadataSize { return o, nil } } else { // Metadata is disabled, hence this is either a multi-chunk // composite file without meta object or a non-chunked file. // Create an empty wrapper here, scan directory to determine // which case it is and postpone reading if it's the latter one. o = f.newObject(remote, nil, nil) } // If the object is small, it's probably a meta object. // However, composite file must have data chunks besides it. // Scan directory for possible data chunks now and decide later on. dir := path.Dir(strings.TrimRight(remote, "/")) if dir == "." { dir = "" } entries, err := f.base.List(ctx, dir) switch err { case nil: // OK, fall thru case fs.ErrorDirNotFound: entries = nil default: return nil, errors.Wrap(err, "can't detect composite file") } if f.useNoRename { currentXactID, err = o.readXactID(ctx) if err != nil { return nil, err } } caseInsensitive := f.features.CaseInsensitive for _, dirOrObject := range entries { entry, ok := dirOrObject.(fs.Object) if !ok { continue } entryRemote := entry.Remote() if !caseInsensitive && !strings.Contains(entryRemote, remote) { continue // bypass regexp to save cpu } mainRemote, chunkNo, ctrlType, xactID := f.parseChunkName(entryRemote) if mainRemote == "" { continue // skip non-chunks } if caseInsensitive { sameMain = strings.EqualFold(mainRemote, remote) } else { sameMain = mainRemote == remote } if !sameMain { continue // skip alien chunks } if ctrlType != "" || xactID != currentXactID { if f.useMeta { // temporary/control chunk calls for lazy metadata read o.unsure = true } continue } //fs.Debugf(f, "%q belongs to %q as chunk %d", entryRemote, mainRemote, chunkNo) if err := o.addChunk(entry, chunkNo); err != nil { return nil, err } } if o.main == nil && (o.chunks == nil || len(o.chunks) == 0) { // Scanning hasn't found data chunks with conforming names. if f.useMeta || quickScan { // Metadata is required but absent and there are no chunks. return nil, fs.ErrorObjectNotFound } // Data chunks are not found and metadata is disabled. // Thus, we are in the "latter case" from above. // Let's try the postponed reading of a non-chunked file and add it // as a single chunk to the empty composite wrapper created above // with nil metadata. baseObj, err = f.base.NewObject(ctx, remote) if err == nil { err = o.addChunk(baseObj, 0) } if err != nil { return nil, err } } // This is either a composite object with metadata or a non-chunked // file without metadata. Validate it and update the total data size. // As an optimization, skip metadata reading here - we will call // readMetadata lazily when needed (reading can be expensive). if !quickScan { if err := o.validate(); err != nil { return nil, err } } return o, nil } // readMetadata reads composite object metadata and caches results, // in case of critical errors metadata is not cached. // Returns ErrMetaUnknown if an unsupported metadata format is detected. // If object is not chunked but marked by List or NewObject for recheck, // readMetadata will attempt to parse object as composite with fallback // to non-chunked representation if the attempt fails. func (o *Object) readMetadata(ctx context.Context) error { // return quickly if metadata is absent or has been already cached if !o.f.useMeta { o.isFull = true } if o.isFull { return nil } if !o.isComposite() && !o.unsure { // this for sure is a non-chunked standalone file o.isFull = true return nil } // validate metadata metaObject := o.main if metaObject.Size() > maxMetadataSize { if o.unsure { // this is not metadata but a foreign object o.unsure = false o.chunks = nil // make isComposite return false o.isFull = true // cache results return nil } return ErrMetaTooBig } // size is within limits, perform consistency checks reader, err := metaObject.Open(ctx) if err != nil { return err } metadata, err := ioutil.ReadAll(reader) _ = reader.Close() // ensure file handle is freed on windows if err != nil { return err } switch o.f.opt.MetaFormat { case "simplejson": metaInfo, madeByChunker, err := unmarshalSimpleJSON(ctx, metaObject, metadata) if o.unsure { o.unsure = false if !madeByChunker { // this is not metadata but a foreign object o.chunks = nil // make isComposite return false o.isFull = true // cache results return nil } } switch err { case nil: // fall thru case ErrMetaTooBig, ErrMetaUnknown: return err // return these errors unwrapped for unit tests default: return errors.Wrap(err, "invalid metadata") } if o.size != metaInfo.Size() || len(o.chunks) != metaInfo.nChunks { return errors.New("metadata doesn't match file size") } o.md5 = metaInfo.md5 o.sha1 = metaInfo.sha1 o.xactID = metaInfo.xactID } o.isFull = true // cache results o.xIDCached = true return nil } // readXactID returns the transaction ID stored in the passed metadata object func (o *Object) readXactID(ctx context.Context) (xactID string, err error) { // if xactID has already been read and cahced return it now if o.xIDCached { return o.xactID, nil } // Avoid reading metadata for backends that don't use xactID to identify permanent chunks if !o.f.useNoRename { return "", errors.New("readXactID requires norename transactions") } if o.main == nil { return "", errors.New("readXactID requires valid metaobject") } if o.main.Size() > maxMetadataSize { return "", nil // this was likely not a metadata object, return empty xactID but don't throw error } reader, err := o.main.Open(ctx) if err != nil { return "", err } data, err := ioutil.ReadAll(reader) _ = reader.Close() // ensure file handle is freed on windows if err != nil { return "", err } switch o.f.opt.MetaFormat { case "simplejson": if len(data) > maxMetadataSizeWritten { return "", nil // this was likely not a metadata object, return empty xactID but don't throw error } var metadata metaSimpleJSON err = json.Unmarshal(data, &metadata) if err != nil { return "", nil // this was likely not a metadata object, return empty xactID but don't throw error } xactID = metadata.XactID } o.xactID = xactID o.xIDCached = true return xactID, nil } // put implements Put, PutStream, PutUnchecked, Update func (f *Fs) put( ctx context.Context, in io.Reader, src fs.ObjectInfo, remote string, options []fs.OpenOption, basePut putFn, action string, target fs.Object) (obj fs.Object, err error) { // Perform consistency checks if err := f.forbidChunk(src, remote); err != nil { return nil, errors.Wrap(err, action+" refused") } if target == nil { // Get target object with a quick directory scan // skip metadata check if target object does not exist. // ignore not-chunked objects, skip chunk size checks. if obj, err := f.scanObject(ctx, remote, true); err == nil { target = obj } } if target != nil { obj := target.(*Object) if err := obj.readMetadata(ctx); err == ErrMetaUnknown { // refuse to update a file of unsupported format return nil, errors.Wrap(err, "refusing to "+action) } } // Prepare to upload c := f.newChunkingReader(src) wrapIn := c.wrapStream(ctx, in, src) var metaObject fs.Object defer func() { if err != nil { c.rollback(ctx, metaObject) } }() baseRemote := remote xactID, errXact := f.newXactID(ctx, baseRemote) if errXact != nil { return nil, errXact } // Transfer chunks data for c.chunkNo = 0; !c.done; c.chunkNo++ { if c.chunkNo > maxSafeChunkNumber { return nil, ErrChunkOverflow } tempRemote := f.makeChunkName(baseRemote, c.chunkNo, "", xactID) size := c.sizeLeft if size > c.chunkSize { size = c.chunkSize } savedReadCount := c.readCount // If a single chunk is expected, avoid the extra rename operation chunkRemote := tempRemote if c.expectSingle && c.chunkNo == 0 && optimizeFirstChunk { chunkRemote = baseRemote } info := f.wrapInfo(src, chunkRemote, size) // Refill chunkLimit and let basePut repeatedly call chunkingReader.Read() c.chunkLimit = c.chunkSize // TODO: handle range/limit options chunk, errChunk := basePut(ctx, wrapIn, info, options...) if errChunk != nil { return nil, errChunk } if size > 0 && c.readCount == savedReadCount && c.expectSingle { // basePut returned success but didn't call chunkingReader's Read. // This is possible if wrapped remote has performed the put by hash // because chunker bridges Hash from source for non-chunked files. // Hence, force Read here to update accounting and hashsums. if err := c.dummyRead(wrapIn, size); err != nil { return nil, err } } if c.sizeLeft == 0 && !c.done { // The file has been apparently put by hash, force completion. c.done = true } // Expected a single chunk but more to come, so name it as usual. if !c.done && chunkRemote != tempRemote { fs.Infof(chunk, "Expected single chunk, got more") chunkMoved, errMove := f.baseMove(ctx, chunk, tempRemote, delFailed) if errMove != nil { silentlyRemove(ctx, chunk) return nil, errMove } chunk = chunkMoved } // Wrapped remote may or may not have seen EOF from chunking reader, // e.g. the box multi-uploader reads exactly the chunk size specified // and skips the "EOF" read. Hence, switch to next limit here. if !(c.chunkLimit == 0 || c.chunkLimit == c.chunkSize || c.sizeTotal == -1 || c.done) { silentlyRemove(ctx, chunk) return nil, fmt.Errorf("destination ignored %d data bytes", c.chunkLimit) } c.chunkLimit = c.chunkSize c.chunks = append(c.chunks, chunk) } // Validate uploaded size if c.sizeTotal != -1 && c.readCount != c.sizeTotal { return nil, fmt.Errorf("incorrect upload size %d != %d", c.readCount, c.sizeTotal) } // Check for input that looks like valid metadata needMeta := len(c.chunks) > 1 if c.readCount <= maxMetadataSize && len(c.chunks) == 1 { _, madeByChunker, _ := unmarshalSimpleJSON(ctx, c.chunks[0], c.smallHead) needMeta = madeByChunker } // Finalize small object as non-chunked. // This can be bypassed, and single chunk with metadata will be // created if forced by consistent hashing or due to unsafe input. if !needMeta && !f.hashAll && f.useMeta { // If previous object was chunked, remove its chunks f.removeOldChunks(ctx, baseRemote) // Rename single data chunk in place chunk := c.chunks[0] if chunk.Remote() != baseRemote { chunkMoved, errMove := f.baseMove(ctx, chunk, baseRemote, delAlways) if errMove != nil { silentlyRemove(ctx, chunk) return nil, errMove } chunk = chunkMoved } return f.newObject("", chunk, nil), nil } // Validate total size of data chunks var sizeTotal int64 for _, chunk := range c.chunks { sizeTotal += chunk.Size() } if sizeTotal != c.readCount { return nil, fmt.Errorf("incorrect chunks size %d != %d", sizeTotal, c.readCount) } // If previous object was chunked, remove its chunks f.removeOldChunks(ctx, baseRemote) if !f.useNoRename { // The transaction suffix will be removed for backends with quick rename operations for chunkNo, chunk := range c.chunks { chunkRemote := f.makeChunkName(baseRemote, chunkNo, "", "") chunkMoved, errMove := f.baseMove(ctx, chunk, chunkRemote, delFailed) if errMove != nil { return nil, errMove } c.chunks[chunkNo] = chunkMoved } xactID = "" } if !f.useMeta { // Remove stale metadata, if any oldMeta, errOldMeta := f.base.NewObject(ctx, baseRemote) if errOldMeta == nil { silentlyRemove(ctx, oldMeta) } o := f.newObject(baseRemote, nil, c.chunks) o.size = sizeTotal return o, nil } // Update meta object var metadata []byte switch f.opt.MetaFormat { case "simplejson": c.updateHashes() metadata, err = marshalSimpleJSON(ctx, sizeTotal, len(c.chunks), c.md5, c.sha1, xactID) } if err == nil { metaInfo := f.wrapInfo(src, baseRemote, int64(len(metadata))) metaObject, err = basePut(ctx, bytes.NewReader(metadata), metaInfo) } if err != nil { return nil, err } o := f.newObject("", metaObject, c.chunks) o.size = sizeTotal o.xactID = xactID return o, nil } type putFn func(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) type chunkingReader struct { baseReader io.Reader sizeTotal int64 sizeLeft int64 readCount int64 chunkSize int64 chunkLimit int64 chunkNo int err error done bool chunks []fs.Object expectSingle bool smallHead []byte fs *Fs hasher gohash.Hash md5 string sha1 string } func (f *Fs) newChunkingReader(src fs.ObjectInfo) *chunkingReader { c := &chunkingReader{ fs: f, chunkSize: int64(f.opt.ChunkSize), sizeTotal: src.Size(), } c.chunkLimit = c.chunkSize c.sizeLeft = c.sizeTotal c.expectSingle = c.sizeTotal >= 0 && c.sizeTotal <= c.chunkSize return c } func (c *chunkingReader) wrapStream(ctx context.Context, in io.Reader, src fs.ObjectInfo) io.Reader { baseIn, wrapBack := accounting.UnWrap(in) switch { case c.fs.useMD5: srcObj := fs.UnWrapObjectInfo(src) if srcObj != nil && srcObj.Fs().Features().SlowHash { fs.Debugf(src, "skip slow MD5 on source file, hashing in-transit") c.hasher = md5.New() break } if c.md5, _ = src.Hash(ctx, hash.MD5); c.md5 == "" { if c.fs.hashFallback { c.sha1, _ = src.Hash(ctx, hash.SHA1) } else { c.hasher = md5.New() } } case c.fs.useSHA1: srcObj := fs.UnWrapObjectInfo(src) if srcObj != nil && srcObj.Fs().Features().SlowHash { fs.Debugf(src, "skip slow SHA1 on source file, hashing in-transit") c.hasher = sha1.New() break } if c.sha1, _ = src.Hash(ctx, hash.SHA1); c.sha1 == "" { if c.fs.hashFallback { c.md5, _ = src.Hash(ctx, hash.MD5) } else { c.hasher = sha1.New() } } } if c.hasher != nil { baseIn = io.TeeReader(baseIn, c.hasher) } c.baseReader = baseIn return wrapBack(c) } func (c *chunkingReader) updateHashes() { if c.hasher == nil { return } switch { case c.fs.useMD5: c.md5 = hex.EncodeToString(c.hasher.Sum(nil)) case c.fs.useSHA1: c.sha1 = hex.EncodeToString(c.hasher.Sum(nil)) } } // Note: Read is not called if wrapped remote performs put by hash. func (c *chunkingReader) Read(buf []byte) (bytesRead int, err error) { if c.chunkLimit <= 0 { // Chunk complete - switch to next one. // Note #1: // We might not get here because some remotes (e.g. box multi-uploader) // read the specified size exactly and skip the concluding EOF Read. // Then a check in the put loop will kick in. // Note #2: // The crypt backend after receiving EOF here will call Read again // and we must insist on returning EOF, so we postpone refilling // chunkLimit to the main loop. return 0, io.EOF } if int64(len(buf)) > c.chunkLimit { buf = buf[0:c.chunkLimit] } bytesRead, err = c.baseReader.Read(buf) if err != nil && err != io.EOF { c.err = err c.done = true return } c.accountBytes(int64(bytesRead)) if c.chunkNo == 0 && c.expectSingle && bytesRead > 0 && c.readCount <= maxMetadataSize { c.smallHead = append(c.smallHead, buf[:bytesRead]...) } if bytesRead == 0 && c.sizeLeft == 0 { err = io.EOF // Force EOF when no data left. } if err == io.EOF { c.done = true } return } func (c *chunkingReader) accountBytes(bytesRead int64) { c.readCount += bytesRead c.chunkLimit -= bytesRead if c.sizeLeft != -1 { c.sizeLeft -= bytesRead } } // dummyRead updates accounting, hashsums, etc. by simulating reads func (c *chunkingReader) dummyRead(in io.Reader, size int64) error { if c.hasher == nil && c.readCount+size > maxMetadataSize { c.accountBytes(size) return nil } const bufLen = 1048576 // 1 MiB buf := make([]byte, bufLen) for size > 0 { n := size if n > bufLen { n = bufLen } if _, err := io.ReadFull(in, buf[0:n]); err != nil { return err } size -= n } return nil } // rollback removes uploaded temporary chunks func (c *chunkingReader) rollback(ctx context.Context, metaObject fs.Object) { if metaObject != nil { c.chunks = append(c.chunks, metaObject) } for _, chunk := range c.chunks { if err := chunk.Remove(ctx); err != nil { fs.Errorf(chunk, "Failed to remove temporary chunk: %v", err) } } } func (f *Fs) removeOldChunks(ctx context.Context, remote string) { oldFsObject, err := f.NewObject(ctx, remote) if err == nil { oldObject := oldFsObject.(*Object) for _, chunk := range oldObject.chunks { if err := chunk.Remove(ctx); err != nil { fs.Errorf(chunk, "Failed to remove old chunk: %v", err) } } } } // Put into the remote path with the given modTime and size. // // May create the object even if it returns an error - if so // will return the object and the error, otherwise will return // nil and the error func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { return f.put(ctx, in, src, src.Remote(), options, f.base.Put, "put", nil) } // PutStream uploads to the remote path with the modTime given of indeterminate size func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { return f.put(ctx, in, src, src.Remote(), options, f.base.Features().PutStream, "upload", nil) } // Update in to the object with the modTime given of the given size func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { basePut := o.f.base.Put if src.Size() < 0 { basePut = o.f.base.Features().PutStream if basePut == nil { return errors.New("wrapped file system does not support streaming uploads") } } oNew, err := o.f.put(ctx, in, src, o.Remote(), options, basePut, "update", o) if err == nil { *o = *oNew.(*Object) } return err } // PutUnchecked uploads the object // // This will create a duplicate if we upload a new file without // checking to see if there is one already - use Put() for that. func (f *Fs) PutUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { do := f.base.Features().PutUnchecked if do == nil { return nil, errors.New("can't PutUnchecked") } // TODO: handle range/limit options and really chunk stream here! o, err := do(ctx, in, f.wrapInfo(src, "", -1)) if err != nil { return nil, err } return f.newObject("", o, nil), nil } // Hashes returns the supported hash sets. // Chunker advertises a hash type if and only if it can be calculated // for files of any size, non-chunked or composite. func (f *Fs) Hashes() hash.Set { // composites AND no fallback AND (chunker OR wrapped Fs will hash all non-chunked's) if f.useMD5 && !f.hashFallback && (f.hashAll || f.base.Hashes().Contains(hash.MD5)) { return hash.NewHashSet(hash.MD5) } if f.useSHA1 && !f.hashFallback && (f.hashAll || f.base.Hashes().Contains(hash.SHA1)) { return hash.NewHashSet(hash.SHA1) } return hash.NewHashSet() // can't provide strong guarantees } // Mkdir makes the directory (container, bucket) // // Shouldn't return an error if it already exists func (f *Fs) Mkdir(ctx context.Context, dir string) error { if err := f.forbidChunk(dir, dir); err != nil { return errors.Wrap(err, "can't mkdir") } return f.base.Mkdir(ctx, dir) } // Rmdir removes the directory (container, bucket) if empty // // Return an error if it doesn't exist or isn't empty func (f *Fs) Rmdir(ctx context.Context, dir string) error { return f.base.Rmdir(ctx, dir) } // Purge all files in the directory // // Implement this if you have a way of deleting all the files // quicker than just running Remove() on the result of List() // // Return an error if it doesn't exist. // // This command will chain to `purge` from wrapped remote. // As a result it removes not only composite chunker files with their // active chunks but also all hidden temporary chunks in the directory. // func (f *Fs) Purge(ctx context.Context, dir string) error { do := f.base.Features().Purge if do == nil { return fs.ErrorCantPurge } return do(ctx, dir) } // Remove an object (chunks and metadata, if any) // // Remove deletes only active chunks of the composite object. // It does not try to look for temporary chunks because they could belong // to another command modifying this composite file in parallel. // // Commands normally cleanup all temporary chunks in case of a failure. // However, if rclone dies unexpectedly, it can leave hidden temporary // chunks, which cannot be discovered using the `list` command. // Remove does not try to search for such chunks or to delete them. // Sometimes this can lead to strange results e.g. when `list` shows that // directory is empty but `rmdir` refuses to remove it because on the // level of wrapped remote it's actually *not* empty. // As a workaround users can use `purge` to forcibly remove it. // // In future, a flag `--chunker-delete-hidden` may be added which tells // Remove to search directory for hidden chunks and remove them too // (at the risk of breaking parallel commands). // // Remove is the only operation allowed on the composite files with // invalid or future metadata format. // We don't let user copy/move/update unsupported composite files. // Let's at least let her get rid of them, just complain loudly. // // This can litter directory with orphan chunks of unsupported types, // but as long as we remove meta object, even future releases will // treat the composite file as removed and refuse to act upon it. // // Disclaimer: corruption can still happen if unsupported file is removed // and then recreated with the same name. // Unsupported control chunks will get re-picked by a more recent // rclone version with unexpected results. This can be helped by // the `delete hidden` flag above or at least the user has been warned. // func (o *Object) Remove(ctx context.Context) (err error) { if err := o.f.forbidChunk(o, o.Remote()); err != nil { // operations.Move can still call Remove if chunker's Move refuses // to corrupt file in hard mode. Hence, refuse to Remove, too. return errors.Wrap(err, "refuse to corrupt") } if err := o.readMetadata(ctx); err == ErrMetaUnknown { // Proceed but warn user that unexpected things can happen. fs.Errorf(o, "Removing a file with unsupported metadata: %v", err) } // Remove non-chunked file or meta object of a composite file. if o.main != nil { err = o.main.Remove(ctx) } // Remove only active data chunks, ignore any temporary chunks that // might probably be created in parallel by other transactions. for _, chunk := range o.chunks { chunkErr := chunk.Remove(ctx) if err == nil { err = chunkErr } } // There are no known control chunks to remove atm. return err } // copyOrMove implements copy or move func (f *Fs) copyOrMove(ctx context.Context, o *Object, remote string, do copyMoveFn, md5, sha1, opName string) (fs.Object, error) { if err := f.forbidChunk(o, remote); err != nil { return nil, errors.Wrapf(err, "can't %s", opName) } if err := o.readMetadata(ctx); err != nil { // Refuse to copy/move composite files with invalid or future // metadata format which might involve unsupported chunk types. return nil, errors.Wrapf(err, "can't %s this file", opName) } if !o.isComposite() { fs.Debugf(o, "%s non-chunked object...", opName) oResult, err := do(ctx, o.mainChunk(), remote) // chain operation to a single wrapped chunk if err != nil { return nil, err } return f.newObject("", oResult, nil), nil } fs.Debugf(o, "%s %d data chunks...", opName, len(o.chunks)) mainRemote := o.remote var newChunks []fs.Object var err error // Copy/move active data chunks. // Ignore possible temporary chunks being created by parallel operations. for _, chunk := range o.chunks { chunkRemote := chunk.Remote() if !strings.HasPrefix(chunkRemote, mainRemote) { err = fmt.Errorf("invalid chunk name %q", chunkRemote) break } chunkSuffix := chunkRemote[len(mainRemote):] chunkResult, err := do(ctx, chunk, remote+chunkSuffix) if err != nil { break } newChunks = append(newChunks, chunkResult) } // Copy or move old metadata. // There are no known control chunks to move/copy atm. var metaObject fs.Object if err == nil && o.main != nil { metaObject, err = do(ctx, o.main, remote) } if err != nil { for _, chunk := range newChunks { silentlyRemove(ctx, chunk) } return nil, err } // Create wrapping object, calculate and validate total size newObj := f.newObject(remote, metaObject, newChunks) err = newObj.validate() if err != nil { silentlyRemove(ctx, newObj) return nil, err } // Update metadata var metadata []byte switch f.opt.MetaFormat { case "simplejson": metadata, err = marshalSimpleJSON(ctx, newObj.size, len(newChunks), md5, sha1, o.xactID) if err == nil { metaInfo := f.wrapInfo(metaObject, "", int64(len(metadata))) err = newObj.main.Update(ctx, bytes.NewReader(metadata), metaInfo) } case "none": if newObj.main != nil { err = newObj.main.Remove(ctx) } } // Return the composite object if err != nil { silentlyRemove(ctx, newObj) return nil, err } return newObj, nil } type copyMoveFn func(context.Context, fs.Object, string) (fs.Object, error) func (f *Fs) okForServerSide(ctx context.Context, src fs.Object, opName string) (obj *Object, md5, sha1 string, ok bool) { var diff string obj, ok = src.(*Object) switch { case !ok: diff = "remote types" case !operations.SameConfig(f.base, obj.f.base): diff = "wrapped remotes" case f.opt.ChunkSize != obj.f.opt.ChunkSize: diff = "chunk sizes" case f.opt.NameFormat != obj.f.opt.NameFormat: diff = "chunk name formats" case f.opt.StartFrom != obj.f.opt.StartFrom: diff = "chunk numbering" case f.opt.MetaFormat != obj.f.opt.MetaFormat: diff = "meta formats" } if diff != "" { fs.Debugf(src, "Can't %s - different %s", opName, diff) ok = false return } if obj.unsure { // ensure object is composite if need to re-read metadata _ = obj.readMetadata(ctx) } requireMetaHash := obj.isComposite() && f.opt.MetaFormat == "simplejson" if !requireMetaHash && !f.hashAll { ok = true // hash is not required for metadata return } switch { case f.useMD5: md5, _ = obj.Hash(ctx, hash.MD5) ok = md5 != "" if !ok && f.hashFallback { sha1, _ = obj.Hash(ctx, hash.SHA1) ok = sha1 != "" } case f.useSHA1: sha1, _ = obj.Hash(ctx, hash.SHA1) ok = sha1 != "" if !ok && f.hashFallback { md5, _ = obj.Hash(ctx, hash.MD5) ok = md5 != "" } default: ok = false } if !ok { fs.Debugf(src, "Can't %s - required hash not found", opName) } return } // Copy src to this remote using server-side copy operations. // // This is stored with the remote path given // // It returns the destination Object and a possible error // // Will only be called if src.Fs().Name() == f.Name() // // If it isn't possible then return fs.ErrorCantCopy func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { baseCopy := f.base.Features().Copy if baseCopy == nil { return nil, fs.ErrorCantCopy } obj, md5, sha1, ok := f.okForServerSide(ctx, src, "copy") if !ok { return nil, fs.ErrorCantCopy } return f.copyOrMove(ctx, obj, remote, baseCopy, md5, sha1, "copy") } // Move src to this remote using server-side move operations. // // This is stored with the remote path given // // It returns the destination Object and a possible error // // Will only be called if src.Fs().Name() == f.Name() // // If it isn't possible then return fs.ErrorCantMove func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { baseMove := func(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { return f.baseMove(ctx, src, remote, delNever) } obj, md5, sha1, ok := f.okForServerSide(ctx, src, "move") if !ok { return nil, fs.ErrorCantMove } return f.copyOrMove(ctx, obj, remote, baseMove, md5, sha1, "move") } // baseMove chains to the wrapped Move or simulates it by Copy+Delete func (f *Fs) baseMove(ctx context.Context, src fs.Object, remote string, delMode int) (fs.Object, error) { var ( dest fs.Object err error ) switch delMode { case delAlways: dest, err = f.base.NewObject(ctx, remote) case delFailed: dest, err = operations.Move(ctx, f.base, nil, remote, src) if err == nil { return dest, err } dest, err = f.base.NewObject(ctx, remote) case delNever: // fall thru, the default } if err != nil { dest = nil } return operations.Move(ctx, f.base, dest, remote, src) } // DirMove moves src, srcRemote to this remote at dstRemote // using server-side move operations. // // Will only be called if src.Fs().Name() == f.Name() // // If it isn't possible then return fs.ErrorCantDirMove // // If destination exists then return fs.ErrorDirExists func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string) error { do := f.base.Features().DirMove if do == nil { return fs.ErrorCantDirMove } srcFs, ok := src.(*Fs) if !ok { fs.Debugf(srcFs, "Can't move directory - not same remote type") return fs.ErrorCantDirMove } return do(ctx, srcFs.base, srcRemote, dstRemote) } // CleanUp the trash in the Fs // // Implement this if you have a way of emptying the trash or // otherwise cleaning up old versions of files. func (f *Fs) CleanUp(ctx context.Context) error { do := f.base.Features().CleanUp if do == nil { return errors.New("can't CleanUp") } return do(ctx) } // About gets quota information from the Fs func (f *Fs) About(ctx context.Context) (*fs.Usage, error) { do := f.base.Features().About if do == nil { return nil, errors.New("About not supported") } return do(ctx) } // UnWrap returns the Fs that this Fs is wrapping func (f *Fs) UnWrap() fs.Fs { return f.base } // WrapFs returns the Fs that is wrapping this Fs func (f *Fs) WrapFs() fs.Fs { return f.wrapper } // SetWrapper sets the Fs that is wrapping this Fs func (f *Fs) SetWrapper(wrapper fs.Fs) { f.wrapper = wrapper } // ChangeNotify calls the passed function with a path // that has had changes. If the implementation // uses polling, it should adhere to the given interval. // // Replace data chunk names by the name of composite file. // Ignore temporary and control chunks. func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryType), pollIntervalChan <-chan time.Duration) { do := f.base.Features().ChangeNotify if do == nil { return } wrappedNotifyFunc := func(path string, entryType fs.EntryType) { //fs.Debugf(f, "ChangeNotify: path %q entryType %d", path, entryType) if entryType == fs.EntryObject { mainPath, _, _, xactID := f.parseChunkName(path) metaXactID := "" if f.useNoRename { metaObject, _ := f.base.NewObject(ctx, mainPath) dummyObject := f.newObject("", metaObject, nil) metaXactID, _ = dummyObject.readXactID(ctx) } if mainPath != "" && xactID == metaXactID { path = mainPath } } notifyFunc(path, entryType) } do(ctx, wrappedNotifyFunc, pollIntervalChan) } // Shutdown the backend, closing any background tasks and any // cached connections. func (f *Fs) Shutdown(ctx context.Context) error { do := f.base.Features().Shutdown if do == nil { return nil } return do(ctx) } // Object represents a composite file wrapping one or more data chunks type Object struct { remote string main fs.Object // meta object if file is composite, or wrapped non-chunked file, nil if meta format is 'none' chunks []fs.Object // active data chunks if file is composite, or wrapped file as a single chunk if meta format is 'none' size int64 // cached total size of chunks in a composite file or -1 for non-chunked files isFull bool // true if metadata has been read xIDCached bool // true if xactID has been read unsure bool // true if need to read metadata to detect object type xactID string // transaction ID for "norename" or empty string for "renamed" chunks md5 string sha1 string f *Fs } func (o *Object) addChunk(chunk fs.Object, chunkNo int) error { if chunkNo < 0 { return fmt.Errorf("invalid chunk number %d", chunkNo+o.f.opt.StartFrom) } if chunkNo == len(o.chunks) { o.chunks = append(o.chunks, chunk) return nil } if chunkNo > maxSafeChunkNumber { return ErrChunkOverflow } if chunkNo > len(o.chunks) { newChunks := make([]fs.Object, (chunkNo + 1), (chunkNo+1)*2) copy(newChunks, o.chunks) o.chunks = newChunks } if o.chunks[chunkNo] != nil { return fmt.Errorf("duplicate chunk number %d", chunkNo+o.f.opt.StartFrom) } o.chunks[chunkNo] = chunk return nil } // validate verifies the object internals and updates total size func (o *Object) validate() error { if !o.isComposite() { _ = o.mainChunk() // verify that single wrapped chunk exists return nil } metaObject := o.main // this file is composite - o.main refers to meta object (or nil if meta format is 'none') if metaObject != nil && metaObject.Size() > maxMetadataSize { // metadata of a chunked file must be a tiny piece of json o.size = -1 return fmt.Errorf("%q metadata is too large", o.remote) } var totalSize int64 for _, chunk := range o.chunks { if chunk == nil { o.size = -1 return fmt.Errorf("%q has missing chunks", o) } totalSize += chunk.Size() } o.size = totalSize // cache up the total data size return nil } func (f *Fs) newObject(remote string, main fs.Object, chunks []fs.Object) *Object { var size int64 = -1 if main != nil { size = main.Size() if remote == "" { remote = main.Remote() } } return &Object{ remote: remote, main: main, size: size, f: f, chunks: chunks, } } // mainChunk returns: // - a wrapped object for non-chunked files // - meta object for chunked files with metadata // - first chunk for chunked files without metadata // Never returns nil. func (o *Object) mainChunk() fs.Object { if o.main != nil { return o.main // meta object or non-chunked wrapped file } if o.chunks != nil { return o.chunks[0] // first chunk of a chunked composite file } panic("invalid chunked object") // very unlikely } func (o *Object) isComposite() bool { return o.chunks != nil } // Fs returns read only access to the Fs that this object is part of func (o *Object) Fs() fs.Info { return o.f } // Return a string version func (o *Object) String() string { if o == nil { return "" } return o.remote } // Remote returns the remote path func (o *Object) Remote() string { return o.remote } // Size returns the size of the file func (o *Object) Size() int64 { if o.isComposite() { return o.size // total size of data chunks in a composite file } return o.mainChunk().Size() // size of wrapped non-chunked file } // Storable returns whether object is storable func (o *Object) Storable() bool { return o.mainChunk().Storable() } // ModTime returns the modification time of the file func (o *Object) ModTime(ctx context.Context) time.Time { return o.mainChunk().ModTime(ctx) } // SetModTime sets the modification time of the file func (o *Object) SetModTime(ctx context.Context, mtime time.Time) error { if err := o.readMetadata(ctx); err != nil { return err // refuse to act on unsupported format } return o.mainChunk().SetModTime(ctx, mtime) } // Hash returns the selected checksum of the file. // If no checksum is available it returns "". // // Hash won't fail with `unsupported` error but return empty // hash string if a particular hashsum type is not supported // // Hash takes hashsum from metadata if available or requests it // from wrapped remote for non-chunked files. // Metadata (if meta format is not 'none') is by default kept // only for composite files. In the "All" hashing mode chunker // will force metadata on all files if particular hashsum type // is not supported by wrapped remote. // // Note that Hash prefers the wrapped hashsum for non-chunked // file, then tries to read it from metadata. This in theory // handles the unusual case when a small file has been tampered // on the level of wrapped remote but chunker is unaware of that. // func (o *Object) Hash(ctx context.Context, hashType hash.Type) (string, error) { if err := o.readMetadata(ctx); err != nil { return "", err // valid metadata is required to get hash, abort } if !o.isComposite() { // First, chain to the wrapped non-chunked file if possible. if value, err := o.mainChunk().Hash(ctx, hashType); err == nil && value != "" { return value, nil } } // Try hash from metadata if the file is composite or if wrapped remote fails. switch hashType { case hash.MD5: if o.md5 == "" { return "", nil } return o.md5, nil case hash.SHA1: if o.sha1 == "" { return "", nil } return o.sha1, nil default: return "", hash.ErrUnsupported } } // UnWrap returns the wrapped Object func (o *Object) UnWrap() fs.Object { return o.mainChunk() } // Open opens the file for read. Call Close() on the returned io.ReadCloser func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (rc io.ReadCloser, err error) { if err := o.readMetadata(ctx); err != nil { // refuse to open unsupported format return nil, errors.Wrap(err, "can't open") } if !o.isComposite() { return o.mainChunk().Open(ctx, options...) // chain to wrapped non-chunked file } var openOptions []fs.OpenOption var offset, limit int64 = 0, -1 for _, option := range options { switch opt := option.(type) { case *fs.SeekOption: offset = opt.Offset case *fs.RangeOption: offset, limit = opt.Decode(o.size) default: // pass Options on to the wrapped open, if appropriate openOptions = append(openOptions, option) } } if offset < 0 { return nil, errors.New("invalid offset") } if limit < 0 { limit = o.size - offset } return o.newLinearReader(ctx, offset, limit, openOptions) } // linearReader opens and reads file chunks sequentially, without read-ahead type linearReader struct { ctx context.Context chunks []fs.Object options []fs.OpenOption limit int64 count int64 pos int reader io.ReadCloser err error } func (o *Object) newLinearReader(ctx context.Context, offset, limit int64, options []fs.OpenOption) (io.ReadCloser, error) { r := &linearReader{ ctx: ctx, chunks: o.chunks, options: options, limit: limit, } // skip to chunk for given offset err := io.EOF for offset >= 0 && err != nil { offset, err = r.nextChunk(offset) } if err == nil || err == io.EOF { r.err = err return r, nil } return nil, err } func (r *linearReader) nextChunk(offset int64) (int64, error) { if r.err != nil { return -1, r.err } if r.pos >= len(r.chunks) || r.limit <= 0 || offset < 0 { return -1, io.EOF } chunk := r.chunks[r.pos] count := chunk.Size() r.pos++ if offset >= count { return offset - count, io.EOF } count -= offset if r.limit < count { count = r.limit } options := append(r.options, &fs.RangeOption{Start: offset, End: offset + count - 1}) if err := r.Close(); err != nil { return -1, err } reader, err := chunk.Open(r.ctx, options...) if err != nil { return -1, err } r.reader = reader r.count = count return offset, nil } func (r *linearReader) Read(p []byte) (n int, err error) { if r.err != nil { return 0, r.err } if r.limit <= 0 { r.err = io.EOF return 0, io.EOF } for r.count <= 0 { // current chunk has been read completely or its size is zero off, err := r.nextChunk(0) if off < 0 { r.err = err return 0, err } } n, err = r.reader.Read(p) if err == nil || err == io.EOF { r.count -= int64(n) r.limit -= int64(n) if r.limit > 0 { err = nil // more data to read } } r.err = err return } func (r *linearReader) Close() (err error) { if r.reader != nil { err = r.reader.Close() r.reader = nil } return } // ObjectInfo describes a wrapped fs.ObjectInfo for being the source type ObjectInfo struct { src fs.ObjectInfo fs *Fs nChunks int // number of data chunks xactID string // transaction ID for "norename" or empty string for "renamed" chunks size int64 // overrides source size by the total size of data chunks remote string // overrides remote name md5 string // overrides MD5 checksum sha1 string // overrides SHA1 checksum } func (f *Fs) wrapInfo(src fs.ObjectInfo, newRemote string, totalSize int64) *ObjectInfo { return &ObjectInfo{ src: src, fs: f, size: totalSize, remote: newRemote, } } // Fs returns read only access to the Fs that this object is part of func (oi *ObjectInfo) Fs() fs.Info { if oi.fs == nil { panic("stub ObjectInfo") } return oi.fs } // String returns string representation func (oi *ObjectInfo) String() string { return oi.src.String() } // Storable returns whether object is storable func (oi *ObjectInfo) Storable() bool { return oi.src.Storable() } // Remote returns the remote path func (oi *ObjectInfo) Remote() string { if oi.remote != "" { return oi.remote } return oi.src.Remote() } // Size returns the size of the file func (oi *ObjectInfo) Size() int64 { if oi.size != -1 { return oi.size } return oi.src.Size() } // ModTime returns the modification time func (oi *ObjectInfo) ModTime(ctx context.Context) time.Time { return oi.src.ModTime(ctx) } // Hash returns the selected checksum of the wrapped file // It returns "" if no checksum is available or if this // info doesn't wrap the complete file. func (oi *ObjectInfo) Hash(ctx context.Context, hashType hash.Type) (string, error) { var errUnsupported error switch hashType { case hash.MD5: if oi.md5 != "" { return oi.md5, nil } case hash.SHA1: if oi.sha1 != "" { return oi.sha1, nil } default: errUnsupported = hash.ErrUnsupported } if oi.Size() != oi.src.Size() { // fail if this info wraps only a part of the file return "", errUnsupported } // chain to full source if possible value, err := oi.src.Hash(ctx, hashType) if err == hash.ErrUnsupported { return "", errUnsupported } return value, err } // ID returns the ID of the Object if known, or "" if not func (o *Object) ID() string { if doer, ok := o.mainChunk().(fs.IDer); ok { return doer.ID() } return "" } // Meta format `simplejson` type metaSimpleJSON struct { // required core fields Version *int `json:"ver"` Size *int64 `json:"size"` // total size of data chunks ChunkNum *int `json:"nchunks"` // number of data chunks // optional extra fields MD5 string `json:"md5,omitempty"` SHA1 string `json:"sha1,omitempty"` XactID string `json:"txn,omitempty"` // transaction ID for norename transactions } // marshalSimpleJSON // // Current implementation creates metadata in three cases: // - for files larger than chunk size // - if file contents can be mistaken as meta object // - if consistent hashing is On but wrapped remote can't provide given hash // func marshalSimpleJSON(ctx context.Context, size int64, nChunks int, md5, sha1, xactID string) ([]byte, error) { version := metadataVersion if xactID == "" && version == 2 { version = 1 } metadata := metaSimpleJSON{ // required core fields Version: &version, Size: &size, ChunkNum: &nChunks, // optional extra fields MD5: md5, SHA1: sha1, XactID: xactID, } data, err := json.Marshal(&metadata) if err == nil && data != nil && len(data) >= maxMetadataSizeWritten { // be a nitpicker, never produce something you can't consume return nil, errors.New("metadata can't be this big, please report to rclone developers") } return data, err } // unmarshalSimpleJSON parses metadata. // // In case of errors returns a flag telling whether input has been // produced by incompatible version of rclone vs wasn't metadata at all. // Only metadata format version 1 is supported atm. // Future releases will transparently migrate older metadata objects. // New format will have a higher version number and cannot be correctly // handled by current implementation. // The version check below will then explicitly ask user to upgrade rclone. // func unmarshalSimpleJSON(ctx context.Context, metaObject fs.Object, data []byte) (info *ObjectInfo, madeByChunker bool, err error) { // Be strict about JSON format // to reduce possibility that a random small file resembles metadata. if len(data) > maxMetadataSizeWritten { return nil, false, ErrMetaTooBig } if data == nil || len(data) < 2 || data[0] != '{' || data[len(data)-1] != '}' { return nil, false, errors.New("invalid json") } var metadata metaSimpleJSON err = json.Unmarshal(data, &metadata) if err != nil { return nil, false, err } // Basic fields are strictly required // to reduce possibility that a random small file resembles metadata. if metadata.Version == nil || metadata.Size == nil || metadata.ChunkNum == nil { return nil, false, errors.New("missing required field") } // Perform strict checks, avoid corruption of future metadata formats. if *metadata.Version < 1 { return nil, false, errors.New("wrong version") } if *metadata.Size < 0 { return nil, false, errors.New("negative file size") } if *metadata.ChunkNum < 0 { return nil, false, errors.New("negative number of chunks") } if *metadata.ChunkNum > maxSafeChunkNumber { return nil, true, ErrChunkOverflow // produced by incompatible version of rclone } if metadata.MD5 != "" { _, err = hex.DecodeString(metadata.MD5) if len(metadata.MD5) != 32 || err != nil { return nil, false, errors.New("wrong md5 hash") } } if metadata.SHA1 != "" { _, err = hex.DecodeString(metadata.SHA1) if len(metadata.SHA1) != 40 || err != nil { return nil, false, errors.New("wrong sha1 hash") } } // ChunkNum is allowed to be 0 in future versions if *metadata.ChunkNum < 1 && *metadata.Version <= metadataVersion { return nil, false, errors.New("wrong number of chunks") } // Non-strict mode also accepts future metadata versions if *metadata.Version > metadataVersion { return nil, true, ErrMetaUnknown // produced by incompatible version of rclone } var nilFs *Fs // nil object triggers appropriate type method info = nilFs.wrapInfo(metaObject, "", *metadata.Size) info.nChunks = *metadata.ChunkNum info.md5 = metadata.MD5 info.sha1 = metadata.SHA1 info.xactID = metadata.XactID return info, true, nil } func silentlyRemove(ctx context.Context, o fs.Object) { _ = o.Remove(ctx) // ignore error } // Name of the remote (as passed into NewFs) func (f *Fs) Name() string { return f.name } // Root of the remote (as passed into NewFs) func (f *Fs) Root() string { return f.root } // Features returns the optional features of this Fs func (f *Fs) Features() *fs.Features { return f.features } // String returns a description of the FS func (f *Fs) String() string { return fmt.Sprintf("Chunked '%s:%s'", f.name, f.root) } // Precision returns the precision of this Fs func (f *Fs) Precision() time.Duration { return f.base.Precision() } // CanQuickRename returns true if the Fs supports a quick rename operation func (f *Fs) CanQuickRename() bool { return f.base.Features().Move != nil } // Check the interfaces are satisfied var ( _ fs.Fs = (*Fs)(nil) _ fs.Purger = (*Fs)(nil) _ fs.Copier = (*Fs)(nil) _ fs.Mover = (*Fs)(nil) _ fs.DirMover = (*Fs)(nil) _ fs.PutUncheckeder = (*Fs)(nil) _ fs.PutStreamer = (*Fs)(nil) _ fs.CleanUpper = (*Fs)(nil) _ fs.UnWrapper = (*Fs)(nil) _ fs.ListRer = (*Fs)(nil) _ fs.Abouter = (*Fs)(nil) _ fs.Wrapper = (*Fs)(nil) _ fs.ChangeNotifier = (*Fs)(nil) _ fs.Shutdowner = (*Fs)(nil) _ fs.ObjectInfo = (*ObjectInfo)(nil) _ fs.Object = (*Object)(nil) _ fs.ObjectUnWrapper = (*Object)(nil) _ fs.IDer = (*Object)(nil) )