cmd/dedupe: make largest directory primary to minimize data moved (#3648)
This change makes dedupe recursively count elements in same-named directories and make the largest one primary. This allows to minimize the amount of data moved (or at least the amount of API calls) when dedupe merges them. It also adds a new fs.Object interface `ParentIDer` with function `ParentID` and implements it for the drive and opendrive backends. This function returns parent directory ID for objects on filesystems that allow same-named dirs. We use it to correctly count sizes of same-named directories. Fixes #2568 Co-authored-by: Ivan Andreev <ivandeex@gmail.com>
This commit is contained in:
parent
6a9ae32012
commit
4d8ef7bca7
5 changed files with 187 additions and 67 deletions
|
@ -596,7 +596,7 @@ type baseObject struct {
|
|||
modifiedDate string // RFC3339 time it was last modified
|
||||
mimeType string // The object MIME type
|
||||
bytes int64 // size of the object
|
||||
parents int // number of parents
|
||||
parents []string // IDs of the parent directories
|
||||
}
|
||||
type documentObject struct {
|
||||
baseObject
|
||||
|
@ -1236,7 +1236,7 @@ func (f *Fs) newBaseObject(remote string, info *drive.File) baseObject {
|
|||
modifiedDate: modifiedDate,
|
||||
mimeType: info.MimeType,
|
||||
bytes: size,
|
||||
parents: len(info.Parents),
|
||||
parents: info.Parents,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2019,6 +2019,9 @@ func (f *Fs) itemToDirEntry(remote string, item *drive.File) (entry fs.DirEntry,
|
|||
f.dirCache.Put(remote, item.Id)
|
||||
when, _ := time.Parse(timeFormatIn, item.ModifiedTime)
|
||||
d := fs.NewDir(remote, when).SetID(item.Id)
|
||||
if len(item.Parents) > 0 {
|
||||
d.SetParentID(item.Parents[0])
|
||||
}
|
||||
return d, nil
|
||||
case f.opt.AuthOwnerOnly && !isAuthOwned(item):
|
||||
// ignore object
|
||||
|
@ -3722,7 +3725,7 @@ func (o *linkObject) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo
|
|||
|
||||
// Remove an object
|
||||
func (o *baseObject) Remove(ctx context.Context) error {
|
||||
if o.parents > 1 {
|
||||
if len(o.parents) > 1 {
|
||||
return errors.New("can't delete safely - has multiple parents")
|
||||
}
|
||||
return o.fs.delete(ctx, shortcutID(o.id), o.fs.opt.UseTrash)
|
||||
|
@ -3738,6 +3741,14 @@ func (o *baseObject) ID() string {
|
|||
return o.id
|
||||
}
|
||||
|
||||
// ParentID returns the ID of the Object parent if known, or "" if not
|
||||
func (o *baseObject) ParentID() string {
|
||||
if len(o.parents) > 0 {
|
||||
return o.parents[0]
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (o *documentObject) ext() string {
|
||||
return o.baseObject.remote[len(o.baseObject.remote)-o.extLen:]
|
||||
}
|
||||
|
@ -3798,10 +3809,13 @@ var (
|
|||
_ fs.Object = (*Object)(nil)
|
||||
_ fs.MimeTyper = (*Object)(nil)
|
||||
_ fs.IDer = (*Object)(nil)
|
||||
_ fs.ParentIDer = (*Object)(nil)
|
||||
_ fs.Object = (*documentObject)(nil)
|
||||
_ fs.MimeTyper = (*documentObject)(nil)
|
||||
_ fs.IDer = (*documentObject)(nil)
|
||||
_ fs.ParentIDer = (*documentObject)(nil)
|
||||
_ fs.Object = (*linkObject)(nil)
|
||||
_ fs.MimeTyper = (*linkObject)(nil)
|
||||
_ fs.IDer = (*linkObject)(nil)
|
||||
_ fs.ParentIDer = (*linkObject)(nil)
|
||||
)
|
||||
|
|
|
@ -119,6 +119,7 @@ type Object struct {
|
|||
fs *Fs // what this object is part of
|
||||
remote string // The remote path
|
||||
id string // ID of the file
|
||||
parent string // ID of the parent directory
|
||||
modTime time.Time // The modified time of the object if known
|
||||
md5 string // MD5 hash if known
|
||||
size int64 // Size of the object
|
||||
|
@ -233,7 +234,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
|
|||
// No root so return old f
|
||||
return f, nil
|
||||
}
|
||||
_, err := tempF.newObjectWithInfo(ctx, remote, nil)
|
||||
_, err := tempF.newObjectWithInfo(ctx, remote, nil, "")
|
||||
if err != nil {
|
||||
if err == fs.ErrorObjectNotFound {
|
||||
// File doesn't exist so return old f
|
||||
|
@ -517,7 +518,7 @@ func (f *Fs) Purge(ctx context.Context, dir string) error {
|
|||
// Return an Object from a path
|
||||
//
|
||||
// If it can't be found it returns the error fs.ErrorObjectNotFound.
|
||||
func (f *Fs) newObjectWithInfo(ctx context.Context, remote string, file *File) (fs.Object, error) {
|
||||
func (f *Fs) newObjectWithInfo(ctx context.Context, remote string, file *File, parent string) (fs.Object, error) {
|
||||
// fs.Debugf(nil, "newObjectWithInfo(%s, %v)", remote, file)
|
||||
|
||||
var o *Object
|
||||
|
@ -526,6 +527,7 @@ func (f *Fs) newObjectWithInfo(ctx context.Context, remote string, file *File) (
|
|||
fs: f,
|
||||
remote: remote,
|
||||
id: file.FileID,
|
||||
parent: parent,
|
||||
modTime: time.Unix(file.DateModified, 0),
|
||||
size: file.Size,
|
||||
md5: file.FileHash,
|
||||
|
@ -548,7 +550,7 @@ func (f *Fs) newObjectWithInfo(ctx context.Context, remote string, file *File) (
|
|||
// it returns the error fs.ErrorObjectNotFound.
|
||||
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
|
||||
// fs.Debugf(nil, "NewObject(\"%s\")", remote)
|
||||
return f.newObjectWithInfo(ctx, remote, nil)
|
||||
return f.newObjectWithInfo(ctx, remote, nil, "")
|
||||
}
|
||||
|
||||
// Creates from the parameters passed in a half finished Object which
|
||||
|
@ -768,6 +770,7 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
|
|||
f.dirCache.Put(remote, folder.FolderID)
|
||||
d := fs.NewDir(remote, time.Unix(folder.DateModified, 0)).SetID(folder.FolderID)
|
||||
d.SetItems(int64(folder.ChildFolders))
|
||||
d.SetParentID(directoryID)
|
||||
entries = append(entries, d)
|
||||
}
|
||||
|
||||
|
@ -775,7 +778,7 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
|
|||
file.Name = f.opt.Enc.ToStandardName(file.Name)
|
||||
// fs.Debugf(nil, "File: %s (%s)", file.Name, file.FileID)
|
||||
remote := path.Join(dir, file.Name)
|
||||
o, err := f.newObjectWithInfo(ctx, remote, &file)
|
||||
o, err := f.newObjectWithInfo(ctx, remote, &file, directoryID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1053,6 +1056,11 @@ func (o *Object) ID() string {
|
|||
return o.id
|
||||
}
|
||||
|
||||
// ParentID returns the ID of the Object parent directory if known, or "" if not
|
||||
func (o *Object) ParentID() string {
|
||||
return o.parent
|
||||
}
|
||||
|
||||
// Check the interfaces are satisfied
|
||||
var (
|
||||
_ fs.Fs = (*Fs)(nil)
|
||||
|
@ -1063,4 +1071,5 @@ var (
|
|||
_ fs.DirCacheFlusher = (*Fs)(nil)
|
||||
_ fs.Object = (*Object)(nil)
|
||||
_ fs.IDer = (*Object)(nil)
|
||||
_ fs.ParentIDer = (*Object)(nil)
|
||||
)
|
||||
|
|
12
fs/dir.go
12
fs/dir.go
|
@ -12,6 +12,7 @@ type Dir struct {
|
|||
size int64 // size of directory and contents or -1 if unknown
|
||||
items int64 // number of objects or -1 for unknown
|
||||
id string // optional ID
|
||||
parent string // optional parent directory ID
|
||||
}
|
||||
|
||||
// NewDir creates an unspecialized Directory object
|
||||
|
@ -62,6 +63,17 @@ func (d *Dir) SetID(id string) *Dir {
|
|||
return d
|
||||
}
|
||||
|
||||
// ParentID returns the IDs of the Dir parent if known
|
||||
func (d *Dir) ParentID() string {
|
||||
return d.parent
|
||||
}
|
||||
|
||||
// SetParentID sets the optional parent ID of the Dir
|
||||
func (d *Dir) SetParentID(parent string) *Dir {
|
||||
d.parent = parent
|
||||
return d
|
||||
}
|
||||
|
||||
// ModTime returns the modification date of the file
|
||||
// It should return a best guess if one isn't available
|
||||
func (d *Dir) ModTime(ctx context.Context) time.Time {
|
||||
|
|
6
fs/fs.go
6
fs/fs.go
|
@ -396,6 +396,12 @@ type IDer interface {
|
|||
ID() string
|
||||
}
|
||||
|
||||
// ParentIDer is an optional interface for Object
|
||||
type ParentIDer interface {
|
||||
// ParentID returns the ID of the parent directory if known or nil if not
|
||||
ParentID() string
|
||||
}
|
||||
|
||||
// ObjectUnWrapper is an optional interface for Object
|
||||
type ObjectUnWrapper interface {
|
||||
// UnWrap returns the Object that this Object is wrapping or
|
||||
|
|
|
@ -247,20 +247,82 @@ func (x *DeduplicateMode) Type() string {
|
|||
return "string"
|
||||
}
|
||||
|
||||
// Directory with entry count and links to parents
|
||||
type dedupeDir struct {
|
||||
dir fs.Directory
|
||||
parent string
|
||||
count int
|
||||
}
|
||||
|
||||
// Map of directories by ID with recursive counts
|
||||
type dedupeDirsMap map[string]*dedupeDir
|
||||
|
||||
func (dm dedupeDirsMap) get(id string) *dedupeDir {
|
||||
d := dm[id]
|
||||
if d == nil {
|
||||
d = &dedupeDir{}
|
||||
dm[id] = d
|
||||
}
|
||||
return d
|
||||
}
|
||||
|
||||
func (dm dedupeDirsMap) increment(parent string) {
|
||||
if parent != "" {
|
||||
d := dm.get(parent)
|
||||
d.count++
|
||||
dm.increment(d.parent)
|
||||
}
|
||||
}
|
||||
|
||||
// dedupeFindDuplicateDirs scans f for duplicate directories
|
||||
func dedupeFindDuplicateDirs(ctx context.Context, f fs.Fs) ([][]fs.Directory, error) {
|
||||
func dedupeFindDuplicateDirs(ctx context.Context, f fs.Fs) (duplicateDirs [][]*dedupeDir, err error) {
|
||||
dirsByID := dedupeDirsMap{}
|
||||
dirs := map[string][]*dedupeDir{}
|
||||
|
||||
ci := fs.GetConfig(ctx)
|
||||
dirs := map[string][]fs.Directory{}
|
||||
err := walk.ListR(ctx, f, "", true, ci.MaxDepth, walk.ListDirs, func(entries fs.DirEntries) error {
|
||||
entries.ForDir(func(d fs.Directory) {
|
||||
dirs[d.Remote()] = append(dirs[d.Remote()], d)
|
||||
})
|
||||
err = walk.ListR(ctx, f, "", true, ci.MaxDepth, walk.ListAll, func(entries fs.DirEntries) error {
|
||||
for _, entry := range entries {
|
||||
remote := entry.Remote()
|
||||
parentRemote := path.Dir(remote)
|
||||
if parentRemote == "." {
|
||||
parentRemote = ""
|
||||
}
|
||||
|
||||
// Obtain ID of the object parent, if known.
|
||||
// (This usually means that backend allows duplicate paths)
|
||||
// Fall back to remote parent path, if unavailable.
|
||||
var parent string
|
||||
if entryParentIDer, ok := entry.(fs.ParentIDer); ok {
|
||||
parent = entryParentIDer.ParentID()
|
||||
}
|
||||
if parent == "" {
|
||||
parent = parentRemote
|
||||
}
|
||||
|
||||
var ID string
|
||||
if entryIDer, ok := entry.(fs.IDer); ok {
|
||||
ID = entryIDer.ID()
|
||||
}
|
||||
if ID == "" {
|
||||
ID = remote
|
||||
}
|
||||
|
||||
if fsDir, ok := entry.(fs.Directory); ok {
|
||||
d := dirsByID.get(ID)
|
||||
d.dir = fsDir
|
||||
d.parent = parent
|
||||
dirs[remote] = append(dirs[remote], d)
|
||||
}
|
||||
|
||||
dirsByID.increment(parent)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "find duplicate dirs")
|
||||
}
|
||||
// make sure parents are before children
|
||||
|
||||
// Make sure parents are before children
|
||||
duplicateNames := []string{}
|
||||
for name, ds := range dirs {
|
||||
if len(ds) > 1 {
|
||||
|
@ -268,15 +330,15 @@ func dedupeFindDuplicateDirs(ctx context.Context, f fs.Fs) ([][]fs.Directory, er
|
|||
}
|
||||
}
|
||||
sort.Strings(duplicateNames)
|
||||
duplicateDirs := [][]fs.Directory{}
|
||||
for _, name := range duplicateNames {
|
||||
duplicateDirs = append(duplicateDirs, dirs[name])
|
||||
}
|
||||
return duplicateDirs, nil
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// dedupeMergeDuplicateDirs merges all the duplicate directories found
|
||||
func dedupeMergeDuplicateDirs(ctx context.Context, f fs.Fs, duplicateDirs [][]fs.Directory) error {
|
||||
func dedupeMergeDuplicateDirs(ctx context.Context, f fs.Fs, duplicateDirs [][]*dedupeDir) error {
|
||||
mergeDirs := f.Features().MergeDirs
|
||||
if mergeDirs == nil {
|
||||
return errors.Errorf("%v: can't merge directories", f)
|
||||
|
@ -285,16 +347,31 @@ func dedupeMergeDuplicateDirs(ctx context.Context, f fs.Fs, duplicateDirs [][]fs
|
|||
if dirCacheFlush == nil {
|
||||
return errors.Errorf("%v: can't flush dir cache", f)
|
||||
}
|
||||
for _, dirs := range duplicateDirs {
|
||||
if !SkipDestructive(ctx, dirs[0], "merge duplicate directories") {
|
||||
fs.Infof(dirs[0], "Merging contents of duplicate directories")
|
||||
err := mergeDirs(ctx, dirs)
|
||||
for _, dedupeDirs := range duplicateDirs {
|
||||
if SkipDestructive(ctx, dedupeDirs[0].dir, "merge duplicate directories") {
|
||||
continue
|
||||
}
|
||||
|
||||
// Put largest directory in front to minimize movements
|
||||
fsDirs := []fs.Directory{}
|
||||
largestCount := -1
|
||||
largestIdx := 0
|
||||
for i, d := range dedupeDirs {
|
||||
fsDirs = append(fsDirs, d.dir)
|
||||
if d.count > largestCount {
|
||||
largestIdx = i
|
||||
largestCount = d.count
|
||||
}
|
||||
}
|
||||
fsDirs[largestIdx], fsDirs[0] = fsDirs[0], fsDirs[largestIdx]
|
||||
|
||||
fs.Infof(fsDirs[0], "Merging contents of duplicate directories")
|
||||
err := mergeDirs(ctx, fsDirs)
|
||||
if err != nil {
|
||||
err = fs.CountError(err)
|
||||
fs.Errorf(nil, "merge duplicate dirs: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
dirCacheFlush()
|
||||
return nil
|
||||
}
|
||||
|
@ -335,15 +412,16 @@ func Deduplicate(ctx context.Context, f fs.Fs, mode DeduplicateMode, byHash bool
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(duplicateDirs) != 0 {
|
||||
if len(duplicateDirs) > 0 {
|
||||
if mode != DeduplicateList {
|
||||
err = dedupeMergeDuplicateDirs(ctx, f, duplicateDirs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
for _, dir := range duplicateDirs {
|
||||
fmt.Printf("%s: %d duplicates of this directory\n", dir[0].Remote(), len(dir))
|
||||
for _, dedupeDirs := range duplicateDirs {
|
||||
remote := dedupeDirs[0].dir.Remote()
|
||||
fmt.Printf("%s: %d duplicates of this directory\n", remote, len(dedupeDirs))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -375,7 +453,9 @@ func Deduplicate(ctx context.Context, f fs.Fs, mode DeduplicateMode, byHash bool
|
|||
}
|
||||
|
||||
for remote, objs := range files {
|
||||
if len(objs) > 1 {
|
||||
if len(objs) <= 1 {
|
||||
continue
|
||||
}
|
||||
fs.Logf(remote, "Found %d files with duplicate %s", len(objs), what)
|
||||
if !byHash && mode != DeduplicateList {
|
||||
objs = dedupeDeleteIdentical(ctx, ht, remote, objs)
|
||||
|
@ -411,6 +491,5 @@ func Deduplicate(ctx context.Context, f fs.Fs, mode DeduplicateMode, byHash bool
|
|||
//skip
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue