Refactor walk fallback to be more efficient and not regress #1854

With the current walk algorithm, it was very tricky to make sure that we
didn't regress #1854. Refactoring to start with the hint and work our
way up the tree makes the code clearer and more efficient.

Signed-off-by: James Hewitt <james.hewitt@uk.ibm.com>
This commit is contained in:
James Hewitt 2023-08-21 12:54:40 +01:00
parent e680634060
commit 409c92229c
No known key found for this signature in database
GPG key ID: EA6C3C654B6193E4
2 changed files with 140 additions and 21 deletions

View file

@ -3,6 +3,7 @@ package driver
import (
"context"
"errors"
"path/filepath"
"sort"
"strings"
@ -24,32 +25,70 @@ type WalkFn func(fileInfo FileInfo) error
// WalkFallback traverses a filesystem defined within driver, starting
// from the given path, calling f on each file. It uses the List method and Stat to drive itself.
// If the returned error from the WalkFn is ErrSkipDir and fileInfo refers
// to a directory, the directory will not be entered and Walk
// will continue the traversal. If fileInfo refers to a normal file, processing stops
// If the returned error from the WalkFn is ErrSkipDir the directory will not be entered and Walk
// will continue the traversal. If the returned error from the WalkFn is ErrFilledBuffer, the walk
// stops.
func WalkFallback(ctx context.Context, driver StorageDriver, from string, f WalkFn, options ...func(*WalkOptions)) error {
walkOptions := &WalkOptions{}
for _, o := range options {
o(walkOptions)
}
_, err := doWalkFallback(ctx, driver, from, walkOptions.StartAfterHint, f)
return err
startAfterHint := walkOptions.StartAfterHint
// Ensure that we are checking the hint is contained within from by adding a "/".
// Add to both in case the hint and form are the same, which would still count.
rel, err := filepath.Rel(from, startAfterHint)
if err != nil || strings.HasPrefix(rel, "..") {
// The startAfterHint is outside from, so check if we even need to walk anything
// Replace any path separators with \x00 so that the sort works in a depth-first way
if strings.ReplaceAll(startAfterHint, "/", "\x00") < strings.ReplaceAll(from, "/", "\x00") {
_, err := doWalkFallback(ctx, driver, from, "", f)
return err
}
} else {
// The startAfterHint is within from.
// Walk up the tree until we hit from - we know it is contained.
// Ensure startAfterHint is never deeper than a child of the base
// directory so that doWalkFallback doesn't have to worry about
// depth-first comparisons
base := startAfterHint
for strings.HasPrefix(base, from) {
_, err = doWalkFallback(ctx, driver, base, startAfterHint, f)
switch err.(type) {
case nil:
// No error
case PathNotFoundError:
// dir doesn't exist, so nothing to walk
default:
return err
}
if base == from {
break
}
startAfterHint = base
base, _ = filepath.Split(startAfterHint)
if len(base) > 1 {
base = strings.TrimSuffix(base, "/")
}
}
}
return nil
}
// doWalkFallback performs a depth first walk using recursion.
// from is the directory that this iteration of the function should walk.
// startAfterHint is the child within from to start the walk after. It should only ever be a child of from, or the empty string.
func doWalkFallback(ctx context.Context, driver StorageDriver, from string, startAfterHint string, f WalkFn) (bool, error) {
children, err := driver.List(ctx, from)
if err != nil {
return false, err
}
sort.Stable(sort.StringSlice(children))
sort.Strings(children)
for _, child := range children {
// (@jamstah) This will still walk objects that are before the startAfterHint,
// but to avoid that we would need to cache top level folders to check if there
// are any valid children then call the WalkFn for the cached entries only if
// there are, which is extra overhead. Because the hint is just a hint for
// performance reasons, its fine to walk the extra objects instead.
if child < startAfterHint && !strings.HasPrefix(startAfterHint, child) {
// The startAfterHint has been sanitised in WalkFallback and will either be
// empty, or be suitable for an <= check for this _from_.
if child <= startAfterHint {
continue
}

View file

@ -2,8 +2,8 @@ package driver
import (
"context"
"errors"
"fmt"
"strings"
"testing"
)
@ -36,7 +36,12 @@ type fileSystem struct {
}
func (cfs *fileSystem) List(_ context.Context, path string) ([]string, error) {
return cfs.fileset[path], nil
children := cfs.fileset[path]
if children == nil {
return nil, PathNotFoundError{Path: path}
} else {
return children, nil
}
}
func (cfs *fileSystem) Stat(_ context.Context, path string) (FileInfo, error) {
@ -78,8 +83,9 @@ func TestWalkFileRemoved(t *testing.T) {
func TestWalkFallback(t *testing.T) {
d := &fileSystem{
fileset: map[string][]string{
"/": {"/file1", "/folder1", "/folder2", "/folder3", "/folder4"},
"/": {"/file1", "/folder1", "/folder1-suffix", "/folder2", "/folder3", "/folder4"},
"/folder1": {"/folder1/file1"},
"/folder1-suffix": {"/folder1-suffix/file1"}, // importantly, - is before / in the ascii table
"/folder2": {"/folder2/file1"},
"/folder3": {"/folder3/subfolder1", "/folder3/subfolder2"},
"/folder3/subfolder1": {"/folder3/subfolder1/subfolder1"},
@ -106,6 +112,8 @@ func TestWalkFallback(t *testing.T) {
"/file1",
"/folder1",
"/folder1/file1",
"/folder1-suffix",
"/folder1-suffix/file1",
"/folder2",
"/folder2/file1",
"/folder3",
@ -125,7 +133,7 @@ func TestWalkFallback(t *testing.T) {
if fileInfo.Path() == "/folder1" {
return ErrSkipDir
}
if strings.Contains(fileInfo.Path(), "/folder1") {
if fileInfo.Path() == "/folder1" {
t.Fatalf("skipped dir %s and should not walk %s", "/folder1", fileInfo.Path())
}
return nil
@ -134,6 +142,8 @@ func TestWalkFallback(t *testing.T) {
"/file1",
"/folder1", // return ErrSkipDir, skip anything under /folder1
// skip /folder1/file1
"/folder1-suffix",
"/folder1-suffix/file1",
"/folder2",
"/folder2/file1",
"/folder3",
@ -147,6 +157,29 @@ func TestWalkFallback(t *testing.T) {
"/folder4/file1",
},
},
{
name: "start late expecting -suffix",
fn: noopFn,
options: []func(*WalkOptions){
WithStartAfterHint("/folder1/file1"),
},
expected: []string{
"/folder1-suffix",
"/folder1-suffix/file1",
"/folder2",
"/folder2/file1",
"/folder3",
"/folder3/subfolder1",
"/folder3/subfolder1/subfolder1",
"/folder3/subfolder1/subfolder1/file1",
"/folder3/subfolder2",
"/folder3/subfolder2/subfolder1",
"/folder3/subfolder2/subfolder1/file1",
"/folder4",
"/folder4/file1",
},
err: false,
},
{
name: "start late without from",
fn: noopFn,
@ -155,10 +188,6 @@ func TestWalkFallback(t *testing.T) {
},
expected: []string{
// start late
"/folder3",
"/folder3/subfolder1",
"/folder3/subfolder1/subfolder1",
"/folder3/subfolder1/subfolder1/file1",
"/folder3/subfolder2",
"/folder3/subfolder2/subfolder1",
"/folder3/subfolder2/subfolder1/file1",
@ -176,6 +205,47 @@ func TestWalkFallback(t *testing.T) {
},
expected: []string{
// start late
"/folder3/subfolder2",
"/folder3/subfolder2/subfolder1",
"/folder3/subfolder2/subfolder1/file1",
},
err: false,
},
{
name: "start after from",
fn: noopFn,
from: "/folder1",
options: []func(*WalkOptions){
WithStartAfterHint("/folder1-suffix"),
},
expected: []string{},
err: false,
},
{
name: "start matches from",
fn: noopFn,
from: "/folder3",
options: []func(*WalkOptions){
WithStartAfterHint("/folder3"),
},
expected: []string{
"/folder3/subfolder1",
"/folder3/subfolder1/subfolder1",
"/folder3/subfolder1/subfolder1/file1",
"/folder3/subfolder2",
"/folder3/subfolder2/subfolder1",
"/folder3/subfolder2/subfolder1/file1",
},
err: false,
},
{
name: "start doesn't exist",
fn: noopFn,
from: "/folder3",
options: []func(*WalkOptions){
WithStartAfterHint("/folder3/notafolder/notafile"),
},
expected: []string{
"/folder3/subfolder1",
"/folder3/subfolder1/subfolder1",
"/folder3/subfolder1/subfolder1/file1",
@ -200,6 +270,16 @@ func TestWalkFallback(t *testing.T) {
// stop early
},
},
{
name: "error",
fn: func(fileInfo FileInfo) error {
return errors.New("foo")
},
expected: []string{
"/file1",
},
err: true,
},
{
name: "from folder",
fn: noopFn,
@ -240,7 +320,7 @@ func compareWalked(t *testing.T, expected, walked []string) {
}
for i := range walked {
if walked[i] != expected[i] {
t.Fatalf("expected walked to come in order expected: walked %s", walked)
t.Fatalf("expected walked to come in order expected: walked %s; expected %s;", walked, expected)
}
}
}