Stop single file and --files-from
operations iterating through the source bucket.
This works by making sure directory listings that use a filter only iterate the files provided in the filter (if any). Single file copies now don't iterate the source or destination buckets. Note that this could potentially slow down very long `--files-from` lists - this is easy to fix (with another flag probably) if it causes anyone a problem. Fixes #610 Fixes #769
This commit is contained in:
parent
ec7cef98d8
commit
d033e92234
5 changed files with 97 additions and 16 deletions
19
fs/filter.go
19
fs/filter.go
|
@ -94,8 +94,8 @@ func (rs *rules) len() int {
|
||||||
return len(rs.rules)
|
return len(rs.rules)
|
||||||
}
|
}
|
||||||
|
|
||||||
// filesMap describes the map of files to transfer
|
// FilesMap describes the map of files to transfer
|
||||||
type filesMap map[string]struct{}
|
type FilesMap map[string]struct{}
|
||||||
|
|
||||||
// Filter describes any filtering in operation
|
// Filter describes any filtering in operation
|
||||||
type Filter struct {
|
type Filter struct {
|
||||||
|
@ -106,8 +106,8 @@ type Filter struct {
|
||||||
ModTimeTo time.Time
|
ModTimeTo time.Time
|
||||||
fileRules rules
|
fileRules rules
|
||||||
dirRules rules
|
dirRules rules
|
||||||
files filesMap // files if filesFrom
|
files FilesMap // files if filesFrom
|
||||||
dirs filesMap // dirs from filesFrom
|
dirs FilesMap // dirs from filesFrom
|
||||||
}
|
}
|
||||||
|
|
||||||
// We use time conventions
|
// We use time conventions
|
||||||
|
@ -313,8 +313,8 @@ func (f *Filter) AddRule(rule string) error {
|
||||||
// AddFile adds a single file to the files from list
|
// AddFile adds a single file to the files from list
|
||||||
func (f *Filter) AddFile(file string) error {
|
func (f *Filter) AddFile(file string) error {
|
||||||
if f.files == nil {
|
if f.files == nil {
|
||||||
f.files = make(filesMap)
|
f.files = make(FilesMap)
|
||||||
f.dirs = make(filesMap)
|
f.dirs = make(FilesMap)
|
||||||
}
|
}
|
||||||
file = strings.Trim(file, "/")
|
file = strings.Trim(file, "/")
|
||||||
f.files[file] = struct{}{}
|
f.files[file] = struct{}{}
|
||||||
|
@ -332,6 +332,13 @@ func (f *Filter) AddFile(file string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Files returns all the files from the `--files-from` list
|
||||||
|
//
|
||||||
|
// It may be nil if the list is empty
|
||||||
|
func (f *Filter) Files() FilesMap {
|
||||||
|
return f.files
|
||||||
|
}
|
||||||
|
|
||||||
// Clear clears all the filter rules
|
// Clear clears all the filter rules
|
||||||
func (f *Filter) Clear() {
|
func (f *Filter) Clear() {
|
||||||
f.fileRules.clear()
|
f.fileRules.clear()
|
||||||
|
|
|
@ -180,11 +180,11 @@ func TestNewFilterIncludeFiles(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
err = f.AddFile("/file2.jpg")
|
err = f.AddFile("/file2.jpg")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.Equal(t, filesMap{
|
assert.Equal(t, FilesMap{
|
||||||
"file1.jpg": {},
|
"file1.jpg": {},
|
||||||
"file2.jpg": {},
|
"file2.jpg": {},
|
||||||
}, f.files)
|
}, f.files)
|
||||||
assert.Equal(t, filesMap{}, f.dirs)
|
assert.Equal(t, FilesMap{}, f.dirs)
|
||||||
testInclude(t, f, []includeTest{
|
testInclude(t, f, []includeTest{
|
||||||
{"file1.jpg", 0, 0, true},
|
{"file1.jpg", 0, 0, true},
|
||||||
{"file2.jpg", 1, 0, true},
|
{"file2.jpg", 1, 0, true},
|
||||||
|
@ -206,7 +206,7 @@ func TestNewFilterIncludeFilesDirs(t *testing.T) {
|
||||||
err = f.AddFile(path)
|
err = f.AddFile(path)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
assert.Equal(t, filesMap{
|
assert.Equal(t, FilesMap{
|
||||||
"path": {},
|
"path": {},
|
||||||
"path/to": {},
|
"path/to": {},
|
||||||
"path/to/dir": {},
|
"path/to/dir": {},
|
||||||
|
|
8
fs/fs.go
8
fs/fs.go
|
@ -114,6 +114,10 @@ type ListFser interface {
|
||||||
// Fses must support recursion levels of fs.MaxLevel and 1.
|
// Fses must support recursion levels of fs.MaxLevel and 1.
|
||||||
// They may return ErrorLevelNotSupported otherwise.
|
// They may return ErrorLevelNotSupported otherwise.
|
||||||
List(out ListOpts, dir string)
|
List(out ListOpts, dir string)
|
||||||
|
|
||||||
|
// NewObject finds the Object at remote. If it can't be found
|
||||||
|
// it returns the error ErrorObjectNotFound.
|
||||||
|
NewObject(remote string) (Object, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fs is the interface a cloud storage system must provide
|
// Fs is the interface a cloud storage system must provide
|
||||||
|
@ -121,10 +125,6 @@ type Fs interface {
|
||||||
Info
|
Info
|
||||||
ListFser
|
ListFser
|
||||||
|
|
||||||
// NewObject finds the Object at remote. If it can't be found
|
|
||||||
// it returns the error ErrorObjectNotFound.
|
|
||||||
NewObject(remote string) (Object, error)
|
|
||||||
|
|
||||||
// Put in to the remote path with the modTime given of the given size
|
// Put in to the remote path with the modTime given of the given size
|
||||||
//
|
//
|
||||||
// May create the object even if it returns an error - if so
|
// May create the object even if it returns an error - if so
|
||||||
|
|
48
fs/lister.go
48
fs/lister.go
|
@ -31,13 +31,55 @@ func NewLister() *Lister {
|
||||||
return o.SetLevel(-1).SetBuffer(Config.Checkers)
|
return o.SetLevel(-1).SetBuffer(Config.Checkers)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Finds and lists the files passed in
|
||||||
|
//
|
||||||
|
// Note we ignore the dir and just return all the files in the list
|
||||||
|
func (o *Lister) listFiles(f ListFser, dir string, files FilesMap) {
|
||||||
|
buffer := o.Buffer()
|
||||||
|
jobs := make(chan string, buffer)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
// Start some listing go routines so we find those name in parallel
|
||||||
|
wg.Add(buffer)
|
||||||
|
for i := 0; i < buffer; i++ {
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for remote := range jobs {
|
||||||
|
obj, err := f.NewObject(remote)
|
||||||
|
if err == ErrorObjectNotFound {
|
||||||
|
// silently ignore files that aren't found in the files list
|
||||||
|
} else if err != nil {
|
||||||
|
o.SetError(err)
|
||||||
|
} else {
|
||||||
|
o.Add(obj)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pump the names in
|
||||||
|
for name := range files {
|
||||||
|
jobs <- name
|
||||||
|
if o.IsFinished() {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
close(jobs)
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// Signal that this listing is over
|
||||||
|
o.Finished()
|
||||||
|
}
|
||||||
|
|
||||||
// Start starts a go routine listing the Fs passed in. It returns the
|
// Start starts a go routine listing the Fs passed in. It returns the
|
||||||
// same Lister that was passed in for convenience.
|
// same Lister that was passed in for convenience.
|
||||||
func (o *Lister) Start(f ListFser, dir string) *Lister {
|
func (o *Lister) Start(f ListFser, dir string) *Lister {
|
||||||
o.results = make(chan listerResult, o.buffer)
|
o.results = make(chan listerResult, o.buffer)
|
||||||
go func() {
|
if o.filter != nil && o.filter.Files() != nil {
|
||||||
f.List(o, dir)
|
go o.listFiles(f, dir, o.filter.Files())
|
||||||
}()
|
} else {
|
||||||
|
go f.List(o, dir)
|
||||||
|
}
|
||||||
return o
|
return o
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,7 @@ package fs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -42,6 +43,10 @@ func (f *mockFs) List(o ListOpts, dir string) {
|
||||||
f.listFn(o, dir)
|
f.listFn(o, dir)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *mockFs) NewObject(remote string) (Object, error) {
|
||||||
|
return mockObject(remote), nil
|
||||||
|
}
|
||||||
|
|
||||||
func TestListerStart(t *testing.T) {
|
func TestListerStart(t *testing.T) {
|
||||||
f := &mockFs{}
|
f := &mockFs{}
|
||||||
ranList := false
|
ranList := false
|
||||||
|
@ -56,6 +61,33 @@ func TestListerStart(t *testing.T) {
|
||||||
assert.Equal(t, true, ranList)
|
assert.Equal(t, true, ranList)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestListerStartWithFiles(t *testing.T) {
|
||||||
|
f := &mockFs{}
|
||||||
|
ranList := false
|
||||||
|
f.listFn = func(o ListOpts, dir string) {
|
||||||
|
ranList = true
|
||||||
|
}
|
||||||
|
filter, err := NewFilter()
|
||||||
|
require.NoError(t, err)
|
||||||
|
wantNames := []string{"potato", "sausage", "rutabaga", "carrot", "lettuce"}
|
||||||
|
sort.Strings(wantNames)
|
||||||
|
for _, name := range wantNames {
|
||||||
|
err = filter.AddFile(name)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
o := NewLister().SetFilter(filter).Start(f, "")
|
||||||
|
objs, dirs, err := o.GetAll()
|
||||||
|
require.Nil(t, err)
|
||||||
|
assert.Len(t, dirs, 0)
|
||||||
|
assert.Equal(t, false, ranList)
|
||||||
|
var gotNames []string
|
||||||
|
for _, obj := range objs {
|
||||||
|
gotNames = append(gotNames, obj.Remote())
|
||||||
|
}
|
||||||
|
sort.Strings(gotNames)
|
||||||
|
assert.Equal(t, wantNames, gotNames)
|
||||||
|
}
|
||||||
|
|
||||||
func TestListerSetLevel(t *testing.T) {
|
func TestListerSetLevel(t *testing.T) {
|
||||||
o := NewLister()
|
o := NewLister()
|
||||||
o.SetLevel(1)
|
o.SetLevel(1)
|
||||||
|
|
Loading…
Reference in a new issue