// Dropbox interface package dropbox /* Limitations of dropbox File system is case insensitive FIXME need to delete metadata when we delete files! FIXME Getting this sometimes Failed to copy: Upload failed: invalid character '<' looking for beginning of value This is a JSON decode error - from Update / UploadByChunk - Caused by 500 error from dropbox - See https://github.com/stacktic/dropbox/issues/1 - Possibly confusing dropbox with excess concurrency? */ import ( "crypto/md5" "errors" "fmt" "io" "log" "strings" "sync" "time" "github.com/ncw/rclone/fs" "github.com/ncw/swift" "github.com/stacktic/dropbox" ) // Constants const ( rcloneAppKey = "5jcck7diasz0rqy" rcloneAppSecret = "1n9m04y2zx7bf26" uploadChunkSize = 64 * 1024 // chunk size for upload metadataLimit = dropbox.MetadataLimitDefault // max items to fetch at once datastoreName = "rclone" tableName = "metadata" md5sumField = "md5sum" mtimeField = "mtime" maxCommitRetries = 5 ) // Register with Fs func init() { fs.Register(&fs.FsInfo{ Name: "dropbox", NewFs: NewFs, Config: Config, Options: []fs.Option{{ Name: "app_key", Help: "Dropbox App Key - leave blank to use rclone's.", }, { Name: "app_secret", Help: "Dropbox App Secret - leave blank to use rclone's.", }}, }) } // Configuration helper - called after the user has put in the defaults func Config(name string) { // See if already have a token token := fs.ConfigFile.MustValue(name, "token") if token != "" { fmt.Printf("Already have a dropbox token - refresh?\n") if !fs.Confirm() { return } } // Get a dropbox db := newDropbox(name) // This method will ask the user to visit an URL and paste the generated code. if err := db.Auth(); err != nil { log.Fatalf("Failed to authorize: %v", err) } // Get the token token = db.AccessToken() // Stuff it in the config file if it has changed old := fs.ConfigFile.MustValue(name, "token") if token != old { fs.ConfigFile.SetValue(name, "token", token) fs.SaveConfig() } } // FsDropbox represents a remote dropbox server type FsDropbox struct { db *dropbox.Dropbox // the connection to the dropbox server root string // the path we are working on slashRoot string // root with "/" prefix slashRootSlash string // root with "/" prefix and postix datastoreManager *dropbox.DatastoreManager datastore *dropbox.Datastore table *dropbox.Table datastoreMutex sync.Mutex // lock this when using the datastore datastoreErr error // pending errors on the datastore } // FsObjectDropbox describes a dropbox object type FsObjectDropbox struct { dropbox *FsDropbox // what this object is part of remote string // The remote path md5sum string // md5sum of the object bytes int64 // size of the object modTime time.Time // time it was last modified } // ------------------------------------------------------------ // String converts this FsDropbox to a string func (f *FsDropbox) String() string { return fmt.Sprintf("Dropbox root '%s'", f.root) } // parseParse parses a dropbox 'url' func parseDropboxPath(path string) (root string, err error) { root = strings.Trim(path, "/") return } // Makes a new dropbox from the config func newDropbox(name string) *dropbox.Dropbox { db := dropbox.NewDropbox() appKey := fs.ConfigFile.MustValue(name, "app_key") if appKey == "" { appKey = rcloneAppKey } appSecret := fs.ConfigFile.MustValue(name, "app_secret") if appSecret == "" { appSecret = rcloneAppSecret } db.SetAppInfo(appKey, appSecret) return db } // NewFs contstructs an FsDropbox from the path, container:path func NewFs(name, path string) (fs.Fs, error) { db := newDropbox(name) root, err := parseDropboxPath(path) if err != nil { return nil, err } slashRoot := "/" + root slashRootSlash := slashRoot if root != "" { slashRootSlash += "/" } f := &FsDropbox{ root: root, slashRoot: slashRoot, slashRootSlash: slashRootSlash, db: db, } // Read the token from the config file token := fs.ConfigFile.MustValue(name, "token") // Authorize the client db.SetAccessToken(token) // Make a db to store rclone metadata in f.datastoreManager = db.NewDatastoreManager() // Open the datastore in the background go func() { f.datastoreMutex.Lock() defer f.datastoreMutex.Unlock() fs.Debug(f, "Open rclone datastore") // Open the rclone datastore f.datastore, err = f.datastoreManager.OpenDatastore(datastoreName) if err != nil { fs.Log(f, "Failed to open datastore: %v", err) f.datastoreErr = err return } // Get the table we are using f.table, err = f.datastore.GetTable(tableName) if err != nil { fs.Log(f, "Failed to open datastore table: %v", err) f.datastoreErr = err return } fs.Debug(f, "Open rclone datastore finished") }() return f, nil } // Return an FsObject from a path func (f *FsDropbox) newFsObjectWithInfo(remote string, info *dropbox.Entry) (fs.Object, error) { o := &FsObjectDropbox{ dropbox: f, remote: remote, } if info != nil { o.setMetadataFromEntry(info) } else { err := o.readEntryAndSetMetadata() if err != nil { // logged already fs.Debug("Failed to read info: %s", err) return nil, err } } return o, nil } // Return an FsObject from a path // // May return nil if an error occurred func (f *FsDropbox) NewFsObjectWithInfo(remote string, info *dropbox.Entry) fs.Object { fs, _ := f.newFsObjectWithInfo(remote, info) // Errors have already been logged return fs } // Return an FsObject from a path // // May return nil if an error occurred func (f *FsDropbox) NewFsObject(remote string) fs.Object { return f.NewFsObjectWithInfo(remote, nil) } // Strips the root off entry and returns it func (f *FsDropbox) stripRoot(entry *dropbox.Entry) string { path := entry.Path if strings.HasPrefix(path, f.slashRootSlash) { path = path[len(f.slashRootSlash):] } return path } // Walk the root returning a channel of FsObjects func (f *FsDropbox) list(out fs.ObjectsChan) { cursor := "" for { deltaPage, err := f.db.Delta(cursor, f.slashRoot) if err != nil { fs.Stats.Error() fs.Log(f, "Couldn't list: %s", err) break } else { if deltaPage.Reset && cursor != "" { fs.Log(f, "Unexpected reset during listing - try again") fs.Stats.Error() break } fs.Debug(f, "%d delta entries received", len(deltaPage.Entries)) for i := range deltaPage.Entries { deltaEntry := &deltaPage.Entries[i] entry := deltaEntry.Entry if entry == nil { // This notifies of a deleted object which we ignore continue } if entry.IsDir { // ignore directories } else { path := f.stripRoot(entry) out <- f.NewFsObjectWithInfo(path, entry) } } if !deltaPage.HasMore { break } cursor = deltaPage.Cursor } } } // Walk the path returning a channel of FsObjects func (f *FsDropbox) List() fs.ObjectsChan { out := make(fs.ObjectsChan, fs.Config.Checkers) go func() { defer close(out) f.list(out) }() return out } // Walk the path returning a channel of FsObjects func (f *FsDropbox) ListDir() fs.DirChan { out := make(fs.DirChan, fs.Config.Checkers) go func() { defer close(out) entry, err := f.db.Metadata(f.root, true, false, "", "", metadataLimit) if err != nil { fs.Stats.Error() fs.Log(f, "Couldn't list directories in root: %s", err) } else { for i := range entry.Contents { entry := &entry.Contents[i] if entry.IsDir { out <- &fs.Dir{ Name: f.stripRoot(entry), When: time.Time(entry.ClientMtime), Bytes: int64(entry.Bytes), Count: -1, } } } } }() return out } // A read closer which doesn't close the input type readCloser struct { in io.Reader } // Read bytes from the object - see io.Reader func (rc *readCloser) Read(p []byte) (n int, err error) { return rc.in.Read(p) } // Dummy close function func (rc *readCloser) Close() error { return nil } // Put the object // // Copy the reader in to the new object which is returned // // The new object may have been created if an error is returned func (f *FsDropbox) Put(in io.Reader, remote string, modTime time.Time, size int64) (fs.Object, error) { // Temporary FsObject under construction o := &FsObjectDropbox{dropbox: f, remote: remote} return o, o.Update(in, modTime, size) } // Mkdir creates the container if it doesn't exist func (f *FsDropbox) Mkdir() error { _, err := f.db.CreateFolder(f.slashRoot) return err } // Rmdir deletes the container // // Returns an error if it isn't empty func (f *FsDropbox) Rmdir() error { entry, err := f.db.Metadata(f.slashRoot, true, false, "", "", 16) if err != nil { return err } if len(entry.Contents) != 0 { return errors.New("Directory not empty") } return f.Purge() } // Return the precision func (fs *FsDropbox) Precision() time.Duration { return time.Nanosecond } // Purge deletes all the files and the container // // Returns an error if it isn't empty func (f *FsDropbox) Purge() error { _, err := f.db.Delete(f.slashRoot) return err } // Tries the transaction in fn then calls commit, repeating until retry limit // // Holds datastore mutex while in progress func (f *FsDropbox) transaction(fn func() error) error { f.datastoreMutex.Lock() defer f.datastoreMutex.Unlock() if f.datastoreErr != nil { return f.datastoreErr } var err error for i := 1; i <= maxCommitRetries; i++ { err = fn() if err != nil { return err } err = f.datastore.Commit() if err == nil { break } fs.Debug(f, "Retrying transaction %d/%d", i, maxCommitRetries) } if err != nil { return fmt.Errorf("Failed to commit metadata changes: %s", err) } return nil } // Reads the record attached to key // // Holds datastore mutex while in progress func (f *FsDropbox) readRecord(key string) (*dropbox.Record, error) { f.datastoreMutex.Lock() defer f.datastoreMutex.Unlock() if f.datastoreErr != nil { return nil, f.datastoreErr } return f.table.Get(key) } // ------------------------------------------------------------ // Return the parent Fs func (o *FsObjectDropbox) Fs() fs.Fs { return o.dropbox } // Return a string version func (o *FsObjectDropbox) String() string { if o == nil { return "" } return o.remote } // Return the remote path func (o *FsObjectDropbox) Remote() string { return o.remote } // Md5sum returns the Md5sum of an object returning a lowercase hex string // // FIXME has to download the file! func (o *FsObjectDropbox) Md5sum() (string, error) { if o.md5sum != "" { return o.md5sum, nil } err := o.readMetaData() if err != nil { fs.Log(o, "Failed to read metadata: %s", err) return "", fmt.Errorf("Failed to read metadata: %s", err) } // For pre-existing files which have no md5sum can read it and set it? // in, err := o.Open() // if err != nil { // return "", err // } // defer in.Close() // hash := md5.New() // _, err = io.Copy(hash, in) // if err != nil { // return "", err // } // o.md5sum = fmt.Sprintf("%x", hash.Sum(nil)) return o.md5sum, nil } // Size returns the size of an object in bytes func (o *FsObjectDropbox) Size() int64 { return o.bytes } // setMetadataFromEntry sets the fs data from a dropbox.Entry // // This isn't a complete set of metadata and has an inacurate date func (o *FsObjectDropbox) setMetadataFromEntry(info *dropbox.Entry) { o.bytes = int64(info.Bytes) o.modTime = time.Time(info.ClientMtime) } // Reads the entry from dropbox func (o *FsObjectDropbox) readEntry() (*dropbox.Entry, error) { entry, err := o.dropbox.db.Metadata(o.remotePath(), false, false, "", "", metadataLimit) if err != nil { fs.Debug(o, "Error reading file: %s", err) return nil, fmt.Errorf("Error reading file: %s", err) } return entry, nil } // Read entry if not set and set metadata from it func (o *FsObjectDropbox) readEntryAndSetMetadata() error { // Last resort set time from client if !o.modTime.IsZero() { return nil } entry, err := o.readEntry() if err != nil { return err } o.setMetadataFromEntry(entry) return nil } // Returns the remote path for the object func (o *FsObjectDropbox) remotePath() string { return o.dropbox.slashRootSlash + o.remote } // Returns the key for the metadata database func (o *FsObjectDropbox) metadataKey() string { // NB File system is case insensitive key := strings.ToLower(o.remotePath()) return fmt.Sprintf("%x", md5.Sum([]byte(key))) } // readMetaData gets the info if it hasn't already been fetched func (o *FsObjectDropbox) readMetaData() (err error) { if o.md5sum != "" { return nil } // fs.Debug(o, "Reading metadata from datastore") record, err := o.dropbox.readRecord(o.metadataKey()) if err != nil { fs.Debug(o, "Couldn't read metadata: %s", err) record = nil } if record != nil { // Read md5sum md5sumInterface, ok, err := record.Get(md5sumField) if err != nil { return err } if !ok { fs.Debug(o, "Couldn't find md5sum in record") } else { md5sum, ok := md5sumInterface.(string) if !ok { fs.Debug(o, "md5sum not a string") } else { o.md5sum = md5sum } } // read mtime mtimeInterface, ok, err := record.Get(mtimeField) if err != nil { return err } if !ok { fs.Debug(o, "Couldn't find mtime in record") } else { mtime, ok := mtimeInterface.(string) if !ok { fs.Debug(o, "mtime not a string") } else { modTime, err := swift.FloatStringToTime(mtime) if err != nil { return err } o.modTime = modTime } } } // Last resort o.readEntryAndSetMetadata() return nil } // ModTime returns the modification time of the object // // It attempts to read the objects mtime and if that isn't present the // LastModified returned in the http headers func (o *FsObjectDropbox) ModTime() time.Time { err := o.readMetaData() if err != nil { fs.Log(o, "Failed to read metadata: %s", err) return time.Now() } return o.modTime } // Sets the modification time of the local fs object into the record // FIXME if we don't set md5sum what will that do? func (o *FsObjectDropbox) setModTimeAndMd5sum(modTime time.Time, md5sum string) error { key := o.metadataKey() // fs.Debug(o, "Writing metadata to datastore") return o.dropbox.transaction(func() error { record, err := o.dropbox.table.GetOrInsert(key) if err != nil { return fmt.Errorf("Couldn't read record: %s", err) } if md5sum != "" { err = record.Set(md5sumField, md5sum) if err != nil { return fmt.Errorf("Couldn't set md5sum record: %s", err) } } if !modTime.IsZero() { mtime := swift.TimeToFloatString(modTime) err := record.Set(mtimeField, mtime) if err != nil { return fmt.Errorf("Couldn't set mtime record: %s", err) } } return nil }) } // Sets the modification time of the local fs object // // Commits the datastore func (o *FsObjectDropbox) SetModTime(modTime time.Time) { err := o.setModTimeAndMd5sum(modTime, "") if err != nil { fs.Stats.Error() fs.Log(o, err.Error()) } } // Is this object storable func (o *FsObjectDropbox) Storable() bool { return true } // Open an object for read func (o *FsObjectDropbox) Open() (in io.ReadCloser, err error) { in, _, err = o.dropbox.db.Download(o.remotePath(), "", 0) return } // Update the already existing object // // Copy the reader into the object updating modTime and size // // The new object may have been created if an error is returned func (o *FsObjectDropbox) Update(in io.Reader, modTime time.Time, size int64) error { // Calculate md5sum as we upload it hash := md5.New() rc := &readCloser{in: io.TeeReader(in, hash)} entry, err := o.dropbox.db.UploadByChunk(rc, uploadChunkSize, o.remotePath(), true, "") if err != nil { return fmt.Errorf("Upload failed: %s", err) } o.setMetadataFromEntry(entry) md5sum := fmt.Sprintf("%x", hash.Sum(nil)) return o.setModTimeAndMd5sum(modTime, md5sum) } // Remove an object func (o *FsObjectDropbox) Remove() error { _, err := o.dropbox.db.Delete(o.remotePath()) return err } // Check the interfaces are satisfied var _ fs.Fs = &FsDropbox{} var _ fs.Purger = &FsDropbox{} var _ fs.Object = &FsObjectDropbox{}