Factor DirCache from drive into its own module

This commit is contained in:
Nick Craig-Wood 2015-09-03 21:25:55 +01:00
parent c8cd2b510f
commit ea12e446ca
2 changed files with 310 additions and 247 deletions

236
dircache/dircache.go Normal file
View file

@ -0,0 +1,236 @@
// dircache provides a simple cache for caching directory to path lookups
package dircache
import (
"fmt"
"log"
"strings"
"sync"
)
// DirCache caches paths to directory IDs and vice versa
type DirCache struct {
cacheLock sync.RWMutex
cache map[string]string
invCache map[string]string
fs DirCacher // Interface to find and make stuff
trueRootID string // ID of the absolute root
findDirLock sync.Mutex // Protect findDir from concurrent use
root string // the path we are working on
rootID string // ID of the root directory
findRootLock sync.Mutex // Protect findRoot from concurrent use
foundRoot bool // Whether we have found the root or not
}
// DirCache describes an interface for doing the low level directory work
type DirCacher interface {
FindLeaf(pathID, leaf string) (pathIDOut string, found bool, err error)
CreateDir(pathID, leaf string) (newID string, err error)
}
// Make a new DirCache
//
// The cache is safe for concurrent use
func New(root string, trueRootID string, fs DirCacher) *DirCache {
d := &DirCache{
trueRootID: trueRootID,
root: root,
fs: fs,
}
d.Flush()
d.ResetRoot()
return d
}
// Gets an ID given a path
func (dc *DirCache) Get(path string) (id string, ok bool) {
dc.cacheLock.RLock()
id, ok = dc.cache[path]
dc.cacheLock.RUnlock()
return
}
// GetInv gets a path given an ID
func (dc *DirCache) GetInv(path string) (id string, ok bool) {
dc.cacheLock.RLock()
id, ok = dc.invCache[path]
dc.cacheLock.RUnlock()
return
}
// Put a path, id into the map
func (dc *DirCache) Put(path, id string) {
dc.cacheLock.Lock()
dc.cache[path] = id
dc.invCache[id] = path
dc.cacheLock.Unlock()
}
// Flush the map of all data
func (dc *DirCache) Flush() {
dc.cacheLock.Lock()
dc.cache = make(map[string]string)
dc.invCache = make(map[string]string)
dc.cacheLock.Unlock()
}
// Splits a path into directory, leaf
//
// Path shouldn't start or end with a /
//
// If there are no slashes then directory will be "" and leaf = path
func SplitPath(path string) (directory, leaf string) {
lastSlash := strings.LastIndex(path, "/")
if lastSlash >= 0 {
directory = path[:lastSlash]
leaf = path[lastSlash+1:]
} else {
directory = ""
leaf = path
}
return
}
// Finds the directory passed in returning the directory ID starting from pathID
//
// Path shouldn't start or end with a /
//
// If create is set it will make the directory if not found
//
// Algorithm:
// Look in the cache for the path, if found return the pathID
// If not found strip the last path off the path and recurse
// Now have a parent directory id, so look in the parent for self and return it
func (dc *DirCache) FindDir(path string, create bool) (pathID string, err error) {
pathID = dc._findDirInCache(path)
if pathID != "" {
return
}
dc.findDirLock.Lock()
defer dc.findDirLock.Unlock()
return dc._findDir(path, create)
}
// Look for the root and in the cache - safe to call without the findDirLock
func (dc *DirCache) _findDirInCache(path string) string {
// fmt.Println("Finding",path,"create",create,"cache",cache)
// If it is the root, then return it
if path == "" {
// fmt.Println("Root")
return dc.rootID
}
// If it is in the cache then return it
pathID, ok := dc.Get(path)
if ok {
// fmt.Println("Cache hit on", path)
return pathID
}
return ""
}
// Unlocked findDir - must have findDirLock
func (dc *DirCache) _findDir(path string, create bool) (pathID string, err error) {
pathID = dc._findDirInCache(path)
if pathID != "" {
return pathID, nil
}
// Split the path into directory, leaf
directory, leaf := SplitPath(path)
// Recurse and find pathID for parent directory
parentPathID, err := dc._findDir(directory, create)
if err != nil {
return "", err
}
// Find the leaf in parentPathID
pathID, found, err := dc.fs.FindLeaf(parentPathID, leaf)
if err != nil {
return "", err
}
// If not found create the directory if required or return an error
if !found {
if create {
pathID, err = dc.fs.CreateDir(parentPathID, leaf)
if err != nil {
return "", fmt.Errorf("Failed to make directory: %v", err)
}
} else {
return "", fmt.Errorf("Couldn't find directory: %q", path)
}
}
// Store the leaf directory in the cache
dc.Put(path, pathID)
// fmt.Println("Dir", path, "is", pathID)
return pathID, nil
}
// FindPath finds the leaf and directoryID from a path
//
// If create is set parent directories will be created if they don't exist
func (dc *DirCache) FindPath(path string, create bool) (leaf, directoryID string, err error) {
directory, leaf := SplitPath(path)
directoryID, err = dc.FindDir(directory, create)
if err != nil {
if create {
err = fmt.Errorf("Couldn't find or make directory %q: %s", directory, err)
} else {
err = fmt.Errorf("Couldn't find directory %q: %s", directory, err)
}
}
return
}
// Finds the root directory if not already found
//
// Resets the root directory
//
// If create is set it will make the directory if not found
func (dc *DirCache) FindRoot(create bool) error {
dc.findRootLock.Lock()
defer dc.findRootLock.Unlock()
if dc.foundRoot {
return nil
}
rootID, err := dc.FindDir(dc.root, create)
if err != nil {
return err
}
dc.rootID = rootID
dc.Flush()
// Put the root directory in
dc.Put("", dc.rootID)
dc.foundRoot = true
return nil
}
// RootID returns the ID of the root directory
//
// This should be called after FindRoot
func (dc *DirCache) RootID() string {
if dc.rootID == "" {
log.Fatalf("Internal Error: RootID() called before FindRoot")
}
return dc.rootID
}
// Resets the root directory to the absolute root and clears the DirCache
func (dc *DirCache) ResetRoot() {
dc.findRootLock.Lock()
defer dc.findRootLock.Unlock()
dc.foundRoot = false
dc.Flush()
// Put the true root in
dc.rootID = dc.trueRootID
// Put the root directory in
dc.Put("", dc.rootID)
}

View file

@ -13,7 +13,6 @@ import (
"log" "log"
"net/http" "net/http"
"strings" "strings"
"sync"
"time" "time"
"golang.org/x/oauth2" "golang.org/x/oauth2"
@ -21,6 +20,7 @@ import (
"google.golang.org/api/drive/v2" "google.golang.org/api/drive/v2"
"google.golang.org/api/googleapi" "google.golang.org/api/googleapi"
"github.com/ncw/rclone/dircache"
"github.com/ncw/rclone/fs" "github.com/ncw/rclone/fs"
"github.com/ncw/rclone/oauthutil" "github.com/ncw/rclone/oauthutil"
"github.com/spf13/pflag" "github.com/spf13/pflag"
@ -87,11 +87,7 @@ type FsDrive struct {
root string // the path we are working on root string // the path we are working on
client *http.Client // authorized client client *http.Client // authorized client
about *drive.About // information about the drive, including the root about *drive.About // information about the drive, including the root
rootId string // Id of the root directory dirCache *dircache.DirCache // Map of directory path to directory id
foundRoot bool // Whether we have found the root or not
findRootLock sync.Mutex // Protect findRoot from concurrent use
dirCache *dirCache // Map of directory path to directory id
findDirLock sync.Mutex // Protect findDir from concurrent use
pacer chan struct{} // To pace the operations pacer chan struct{} // To pace the operations
sleepTime time.Duration // Time to sleep for each transaction sleepTime time.Duration // Time to sleep for each transaction
} }
@ -107,52 +103,6 @@ type FsObjectDrive struct {
modifiedDate string // RFC3339 time it was last modified modifiedDate string // RFC3339 time it was last modified
} }
// dirCache caches paths to directory Ids and vice versa
type dirCache struct {
sync.RWMutex
cache map[string]string
invCache map[string]string
}
// Make a new locked map
func newDirCache() *dirCache {
d := &dirCache{}
d.Flush()
return d
}
// Gets an Id given a path
func (m *dirCache) Get(path string) (id string, ok bool) {
m.RLock()
id, ok = m.cache[path]
m.RUnlock()
return
}
// GetInv gets a path given an Id
func (m *dirCache) GetInv(path string) (id string, ok bool) {
m.RLock()
id, ok = m.invCache[path]
m.RUnlock()
return
}
// Put a path, id into the map
func (m *dirCache) Put(path, id string) {
m.Lock()
m.cache[path] = id
m.invCache[id] = path
m.Unlock()
}
// Flush the map of all data
func (m *dirCache) Flush() {
m.Lock()
m.cache = make(map[string]string)
m.invCache = make(map[string]string)
m.Unlock()
}
// ------------------------------------------------------------ // ------------------------------------------------------------
// The name of the remote (as passed into NewFs) // The name of the remote (as passed into NewFs)
@ -353,7 +303,6 @@ func NewFs(name, path string) (fs.Fs, error) {
f := &FsDrive{ f := &FsDrive{
name: name, name: name,
root: root, root: root,
dirCache: newDirCache(),
pacer: make(chan struct{}, 1), pacer: make(chan struct{}, 1),
sleepTime: minSleep, sleepTime: minSleep,
} }
@ -376,17 +325,18 @@ func NewFs(name, path string) (fs.Fs, error) {
return nil, fmt.Errorf("Couldn't read info about Drive: %s", err) return nil, fmt.Errorf("Couldn't read info about Drive: %s", err)
} }
// Find the Id of the true root and clear everything f.dirCache = dircache.New(root, f.about.RootFolderId, f)
f.resetRoot()
// Find the current root // Find the current root
err = f.findRoot(false) err = f.dirCache.FindRoot(false)
if err != nil { if err != nil {
// Assume it is a file // Assume it is a file
newRoot, remote := splitPath(root) newRoot, remote := dircache.SplitPath(root)
newF := *f newF := *f
newF.dirCache = dircache.New(newRoot, f.about.RootFolderId, &newF)
newF.root = newRoot newF.root = newRoot
// Make new Fs which is the parent // Make new Fs which is the parent
err = newF.findRoot(false) err = newF.dirCache.FindRoot(false)
if err != nil { if err != nil {
// No root so return old f // No root so return old f
return f, nil return f, nil
@ -399,7 +349,7 @@ func NewFs(name, path string) (fs.Fs, error) {
// return a Fs Limited to this object // return a Fs Limited to this object
return fs.NewLimited(&newF, obj), nil return fs.NewLimited(&newF, obj), nil
} }
// fmt.Printf("Root id %s", f.rootId) // fmt.Printf("Root id %s", f.dirCache.RootID())
return f, nil return f, nil
} }
@ -437,6 +387,39 @@ func (f *FsDrive) NewFsObject(remote string) fs.Object {
return f.newFsObjectWithInfo(remote, nil) return f.newFsObjectWithInfo(remote, nil)
} }
// FindLeaf finds a directory of name leaf in the folder with ID pathId
func (f *FsDrive) FindLeaf(pathId, leaf string) (pathIdOut string, found bool, err error) {
// Find the leaf in pathId
found, err = f.listAll(pathId, leaf, true, false, func(item *drive.File) bool {
if item.Title == leaf {
pathIdOut = item.Id
return true
}
return false
})
return pathIdOut, found, err
}
// CreateDir makes a directory with pathId as parent and name leaf
func (f *FsDrive) CreateDir(pathId, leaf string) (newId string, err error) {
// fmt.Println("Making", path)
// Define the metadata for the directory we are going to create.
createInfo := &drive.File{
Title: leaf,
Description: leaf,
MimeType: driveFolderType,
Parents: []*drive.ParentReference{{Id: pathId}},
}
var info *drive.File
f.call(&err, func() {
info, err = f.svc.Files.Insert(createInfo).Do()
})
if err != nil {
return "", err
}
return info.Id, nil
}
// Path should be directory path either "" or "path/" // Path should be directory path either "" or "path/"
// //
// List the directory using a recursive list from the root // List the directory using a recursive list from the root
@ -542,172 +525,20 @@ func (f *FsDrive) listDirFull(dirId string, path string, out fs.ObjectsChan) err
return nil return nil
} }
// Splits a path into directory, leaf
//
// Path shouldn't start or end with a /
//
// If there are no slashes then directory will be "" and leaf = path
func splitPath(path string) (directory, leaf string) {
lastSlash := strings.LastIndex(path, "/")
if lastSlash >= 0 {
directory = path[:lastSlash]
leaf = path[lastSlash+1:]
} else {
directory = ""
leaf = path
}
return
}
// Finds the directory passed in returning the directory Id starting from pathId
//
// Path shouldn't start or end with a /
//
// If create is set it will make the directory if not found
//
// Algorithm:
// Look in the cache for the path, if found return the pathId
// If not found strip the last path off the path and recurse
// Now have a parent directory id, so look in the parent for self and return it
func (f *FsDrive) findDir(path string, create bool) (pathId string, err error) {
pathId = f._findDirInCache(path)
if pathId != "" {
return
}
f.findDirLock.Lock()
defer f.findDirLock.Unlock()
return f._findDir(path, create)
}
// Look for the root and in the cache - safe to call without the findDirLock
func (f *FsDrive) _findDirInCache(path string) string {
// fmt.Println("Finding",path,"create",create,"cache",cache)
// If it is the root, then return it
if path == "" {
// fmt.Println("Root")
return f.rootId
}
// If it is in the cache then return it
pathId, ok := f.dirCache.Get(path)
if ok {
// fmt.Println("Cache hit on", path)
return pathId
}
return ""
}
// Unlocked findDir - must have findDirLock
func (f *FsDrive) _findDir(path string, create bool) (pathId string, err error) {
pathId = f._findDirInCache(path)
if pathId != "" {
return
}
// Split the path into directory, leaf
directory, leaf := splitPath(path)
// Recurse and find pathId for directory
pathId, err = f._findDir(directory, create)
if err != nil {
return pathId, err
}
// Find the leaf in pathId
found, err := f.listAll(pathId, leaf, true, false, func(item *drive.File) bool {
if item.Title == leaf {
pathId = item.Id
return true
}
return false
})
if err != nil {
return pathId, err
}
// If not found create the directory if required or return an error
if !found {
if create {
// fmt.Println("Making", path)
// Define the metadata for the directory we are going to create.
createInfo := &drive.File{
Title: leaf,
Description: leaf,
MimeType: driveFolderType,
Parents: []*drive.ParentReference{{Id: pathId}},
}
var info *drive.File
f.call(&err, func() {
info, err = f.svc.Files.Insert(createInfo).Do()
})
if err != nil {
return pathId, fmt.Errorf("Failed to make directory: %v", err)
}
pathId = info.Id
} else {
return pathId, fmt.Errorf("Couldn't find directory: %q", path)
}
}
// Store the directory in the cache
f.dirCache.Put(path, pathId)
// fmt.Println("Dir", path, "is", pathId)
return pathId, nil
}
// Finds the root directory if not already found
//
// Resets the root directory
//
// If create is set it will make the directory if not found
func (f *FsDrive) findRoot(create bool) error {
f.findRootLock.Lock()
defer f.findRootLock.Unlock()
if f.foundRoot {
return nil
}
rootId, err := f.findDir(f.root, create)
if err != nil {
return err
}
f.rootId = rootId
f.dirCache.Flush()
// Put the root directory in
f.dirCache.Put("", f.rootId)
f.foundRoot = true
return nil
}
// Resets the root directory to the absolute root and clears the dirCache
func (f *FsDrive) resetRoot() {
f.findRootLock.Lock()
defer f.findRootLock.Unlock()
f.foundRoot = false
f.dirCache.Flush()
// Put the true root in
f.rootId = f.about.RootFolderId
// Put the root directory in
f.dirCache.Put("", f.rootId)
}
// Walk the path returning a channel of FsObjects // Walk the path returning a channel of FsObjects
func (f *FsDrive) List() fs.ObjectsChan { func (f *FsDrive) List() fs.ObjectsChan {
out := make(fs.ObjectsChan, fs.Config.Checkers) out := make(fs.ObjectsChan, fs.Config.Checkers)
go func() { go func() {
defer close(out) defer close(out)
err := f.findRoot(false) err := f.dirCache.FindRoot(false)
if err != nil { if err != nil {
fs.Stats.Error() fs.Stats.Error()
fs.ErrorLog(f, "Couldn't find root: %s", err) fs.ErrorLog(f, "Couldn't find root: %s", err)
} else { } else {
if f.root == "" && *driveFullList { if f.root == "" && *driveFullList {
err = f.listDirFull(f.rootId, "", out) err = f.listDirFull(f.dirCache.RootID(), "", out)
} else { } else {
err = f.listDirRecursive(f.rootId, "", out) err = f.listDirRecursive(f.dirCache.RootID(), "", out)
} }
if err != nil { if err != nil {
fs.Stats.Error() fs.Stats.Error()
@ -723,12 +554,12 @@ func (f *FsDrive) ListDir() fs.DirChan {
out := make(fs.DirChan, fs.Config.Checkers) out := make(fs.DirChan, fs.Config.Checkers)
go func() { go func() {
defer close(out) defer close(out)
err := f.findRoot(false) err := f.dirCache.FindRoot(false)
if err != nil { if err != nil {
fs.Stats.Error() fs.Stats.Error()
fs.ErrorLog(f, "Couldn't find root: %s", err) fs.ErrorLog(f, "Couldn't find root: %s", err)
} else { } else {
_, err := f.listAll(f.rootId, "", true, false, func(item *drive.File) bool { _, err := f.listAll(f.dirCache.RootID(), "", true, false, func(item *drive.File) bool {
dir := &fs.Dir{ dir := &fs.Dir{
Name: item.Title, Name: item.Title,
Bytes: -1, Bytes: -1,
@ -759,10 +590,9 @@ func (f *FsDrive) createFileInfo(remote string, modTime time.Time, size int64) (
bytes: size, bytes: size,
} }
directory, leaf := splitPath(remote) leaf, directoryId, err := f.dirCache.FindPath(remote, true)
directoryId, err := f.findDir(directory, true)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("Couldn't find or make directory: %s", err) return nil, nil, err
} }
// Define the metadata for the file we are going to create. // Define the metadata for the file we are going to create.
@ -816,20 +646,20 @@ func (f *FsDrive) Put(in io.Reader, remote string, modTime time.Time, size int64
// Mkdir creates the container if it doesn't exist // Mkdir creates the container if it doesn't exist
func (f *FsDrive) Mkdir() error { func (f *FsDrive) Mkdir() error {
return f.findRoot(true) return f.dirCache.FindRoot(true)
} }
// Rmdir deletes the container // Rmdir deletes the container
// //
// Returns an error if it isn't empty // Returns an error if it isn't empty
func (f *FsDrive) Rmdir() error { func (f *FsDrive) Rmdir() error {
err := f.findRoot(false) err := f.dirCache.FindRoot(false)
if err != nil { if err != nil {
return err return err
} }
var children *drive.ChildList var children *drive.ChildList
f.call(&err, func() { f.call(&err, func() {
children, err = f.svc.Children.List(f.rootId).MaxResults(10).Do() children, err = f.svc.Children.List(f.dirCache.RootID()).MaxResults(10).Do()
}) })
if err != nil { if err != nil {
return err return err
@ -841,16 +671,16 @@ func (f *FsDrive) Rmdir() error {
if f.root != "" { if f.root != "" {
f.call(&err, func() { f.call(&err, func() {
if *driveUseTrash { if *driveUseTrash {
_, err = f.svc.Files.Trash(f.rootId).Do() _, err = f.svc.Files.Trash(f.dirCache.RootID()).Do()
} else { } else {
err = f.svc.Files.Delete(f.rootId).Do() err = f.svc.Files.Delete(f.dirCache.RootID()).Do()
} }
}) })
if err != nil { if err != nil {
return err return err
} }
} }
f.resetRoot() f.dirCache.ResetRoot()
return nil return nil
} }
@ -901,18 +731,18 @@ func (f *FsDrive) Purge() error {
if f.root == "" { if f.root == "" {
return fmt.Errorf("Can't purge root directory") return fmt.Errorf("Can't purge root directory")
} }
err := f.findRoot(false) err := f.dirCache.FindRoot(false)
if err != nil { if err != nil {
return err return err
} }
f.call(&err, func() { f.call(&err, func() {
if *driveUseTrash { if *driveUseTrash {
_, err = f.svc.Files.Trash(f.rootId).Do() _, err = f.svc.Files.Trash(f.dirCache.RootID()).Do()
} else { } else {
err = f.svc.Files.Delete(f.rootId).Do() err = f.svc.Files.Delete(f.dirCache.RootID()).Do()
} }
}) })
f.resetRoot() f.dirCache.ResetRoot()
if err != nil { if err != nil {
return err return err
} }
@ -966,17 +796,16 @@ func (dstFs *FsDrive) DirMove(src fs.Fs) error {
} }
// Check if destination exists // Check if destination exists
dstFs.resetRoot() dstFs.dirCache.ResetRoot()
err := dstFs.findRoot(false) err := dstFs.dirCache.FindRoot(false)
if err == nil { if err == nil {
return fs.ErrorDirExists return fs.ErrorDirExists
} }
// Find ID of parent // Find ID of parent
directory, leaf := splitPath(dstFs.root) leaf, directoryId, err := dstFs.dirCache.FindPath(dstFs.root, true)
directoryId, err := dstFs.findDir(directory, true)
if err != nil { if err != nil {
return fmt.Errorf("Couldn't find or make destination directory: %v", err) return err
} }
// Do the move // Do the move
@ -984,11 +813,11 @@ func (dstFs *FsDrive) DirMove(src fs.Fs) error {
Title: leaf, Title: leaf,
Parents: []*drive.ParentReference{{Id: directoryId}}, Parents: []*drive.ParentReference{{Id: directoryId}},
} }
_, err = dstFs.svc.Files.Patch(srcFs.rootId, &patch).Do() _, err = dstFs.svc.Files.Patch(srcFs.dirCache.RootID(), &patch).Do()
if err != nil { if err != nil {
return err return err
} }
srcFs.resetRoot() srcFs.dirCache.ResetRoot()
return nil return nil
} }
@ -1037,11 +866,9 @@ func (o *FsObjectDrive) readMetaData() (err error) {
return nil return nil
} }
directory, leaf := splitPath(o.remote) leaf, directoryId, err := o.drive.dirCache.FindPath(o.remote, false)
directoryId, err := o.drive.findDir(directory, false)
if err != nil { if err != nil {
fs.Debug(o, "Couldn't find directory: %s", err) return err
return fmt.Errorf("Couldn't find directory: %s", err)
} }
found, err := o.drive.listAll(directoryId, leaf, false, true, func(item *drive.File) bool { found, err := o.drive.listAll(directoryId, leaf, false, true, func(item *drive.File) bool {