forked from TrueCloudLab/rclone
vfs: stop virtual directory entries dropping out of the directory cache
Rclone adds virtual directory entries to the directory cache when it creates a file or directory. Before this change these dropped out of the directory cache when the directory cache was reloaded. This meant that when the directory cache expired: - On bucket based backends, empty directories would disappear - When using VFS writeback, files in the process of uploading would disappear This is fixed by keeping track of the virtual entries in each directory. The virtual entries are removed when they become real - ie the object is read back from the listing. This also keeps tracks of deletes in the same way so if a file is deleted, it will not re-appear when the directory cache is reloaded if the deletion hasn't finished yet.
This commit is contained in:
parent
143abe39f2
commit
06a12f5e27
3 changed files with 176 additions and 8 deletions
111
vfs/dir.go
111
vfs/dir.go
|
@ -32,9 +32,21 @@ type Dir struct {
|
||||||
entry fs.Directory
|
entry fs.Directory
|
||||||
read time.Time // time directory entry last read
|
read time.Time // time directory entry last read
|
||||||
items map[string]Node // directory entries - can be empty but not nil
|
items map[string]Node // directory entries - can be empty but not nil
|
||||||
|
virtual map[string]vState // virtual directory entries - may be nil
|
||||||
sys interface{} // user defined info to be attached here
|
sys interface{} // user defined info to be attached here
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//go:generate stringer -type=vState
|
||||||
|
|
||||||
|
// vState describes the state of the virtual directory entries
|
||||||
|
type vState byte
|
||||||
|
|
||||||
|
const (
|
||||||
|
vOK vState = iota // Not virtual
|
||||||
|
vAdd // added file or directory
|
||||||
|
vDel // removed file or directory
|
||||||
|
)
|
||||||
|
|
||||||
func newDir(vfs *VFS, f fs.Fs, parent *Dir, fsDir fs.Directory) *Dir {
|
func newDir(vfs *VFS, f fs.Fs, parent *Dir, fsDir fs.Directory) *Dir {
|
||||||
return &Dir{
|
return &Dir{
|
||||||
vfs: vfs,
|
vfs: vfs,
|
||||||
|
@ -48,7 +60,7 @@ func newDir(vfs *VFS, f fs.Fs, parent *Dir, fsDir fs.Directory) *Dir {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// String converts it to printablee
|
// String converts it to printable
|
||||||
func (d *Dir) String() string {
|
func (d *Dir) String() string {
|
||||||
if d == nil {
|
if d == nil {
|
||||||
return "<nil *Dir>"
|
return "<nil *Dir>"
|
||||||
|
@ -126,7 +138,12 @@ func (d *Dir) forgetDirPath(relativePath string) {
|
||||||
// this is called with the mutex held
|
// this is called with the mutex held
|
||||||
fs.Debugf(dir.path, "forgetting directory cache")
|
fs.Debugf(dir.path, "forgetting directory cache")
|
||||||
dir.read = time.Time{}
|
dir.read = time.Time{}
|
||||||
|
// Don't clear directory entries if there are virtual
|
||||||
|
// items in there.
|
||||||
|
if len(dir.virtual) == 0 {
|
||||||
dir.items = make(map[string]Node)
|
dir.items = make(map[string]Node)
|
||||||
|
dir.virtual = nil
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -139,7 +156,7 @@ func (d *Dir) ForgetAll() {
|
||||||
d.forgetDirPath("")
|
d.forgetDirPath("")
|
||||||
}
|
}
|
||||||
|
|
||||||
// invalidateDir invalidates the directory cache for absPath relative to this dir
|
// invalidateDir invalidates the directory cache for absPath relative to the root
|
||||||
func (d *Dir) invalidateDir(absPath string) {
|
func (d *Dir) invalidateDir(absPath string) {
|
||||||
node := d.vfs.root.cachedNode(absPath)
|
node := d.vfs.root.cachedNode(absPath)
|
||||||
if dir, ok := node.(*Dir); ok {
|
if dir, ok := node.(*Dir); ok {
|
||||||
|
@ -252,20 +269,77 @@ func (d *Dir) rename(newParent *Dir, fsDir fs.Directory) {
|
||||||
|
|
||||||
// addObject adds a new object or directory to the directory
|
// addObject adds a new object or directory to the directory
|
||||||
//
|
//
|
||||||
|
// The name passed in is marked as virtual as it hasn't been read from a remote
|
||||||
|
// directory listing.
|
||||||
|
//
|
||||||
// note that we add new objects rather than updating old ones
|
// note that we add new objects rather than updating old ones
|
||||||
func (d *Dir) addObject(node Node) {
|
func (d *Dir) addObject(node Node) {
|
||||||
d.mu.Lock()
|
d.mu.Lock()
|
||||||
d.items[node.Name()] = node
|
leaf := node.Name()
|
||||||
|
d.items[leaf] = node
|
||||||
|
if d.virtual == nil {
|
||||||
|
d.virtual = make(map[string]vState)
|
||||||
|
}
|
||||||
|
d.virtual[leaf] = vAdd
|
||||||
|
fs.Debugf(d.path, "Added virtual directory entry %v: %q", vAdd, leaf)
|
||||||
d.mu.Unlock()
|
d.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AddVirtual adds a virtual object of name and size to the directory
|
||||||
|
//
|
||||||
|
// This will be replaced with a real object when it is read back from the
|
||||||
|
// remote.
|
||||||
|
//
|
||||||
|
// This is used to add directory entries while things are uploading
|
||||||
|
func (d *Dir) AddVirtual(leaf string, size int64, isDir bool) {
|
||||||
|
var node Node
|
||||||
|
d.mu.RLock()
|
||||||
|
dPath := d.path
|
||||||
|
_, found := d.items[leaf]
|
||||||
|
d.mu.RUnlock()
|
||||||
|
if found {
|
||||||
|
// Don't overwrite existing objects
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if isDir {
|
||||||
|
remote := path.Join(dPath, leaf)
|
||||||
|
entry := fs.NewDir(remote, time.Now())
|
||||||
|
node = newDir(d.vfs, d.f, d, entry)
|
||||||
|
} else {
|
||||||
|
f := newFile(d, dPath, nil, leaf)
|
||||||
|
f.setSize(size)
|
||||||
|
node = f
|
||||||
|
}
|
||||||
|
d.addObject(node)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
// delObject removes an object from the directory
|
// delObject removes an object from the directory
|
||||||
|
//
|
||||||
|
// The name passed in is marked as virtual as the delete it hasn't been read
|
||||||
|
// from a remote directory listing.
|
||||||
func (d *Dir) delObject(leaf string) {
|
func (d *Dir) delObject(leaf string) {
|
||||||
d.mu.Lock()
|
d.mu.Lock()
|
||||||
delete(d.items, leaf)
|
delete(d.items, leaf)
|
||||||
|
if d.virtual == nil {
|
||||||
|
d.virtual = make(map[string]vState)
|
||||||
|
}
|
||||||
|
d.virtual[leaf] = vDel
|
||||||
|
fs.Debugf(d.path, "Added virtual directory entry %v: %q", vDel, leaf)
|
||||||
d.mu.Unlock()
|
d.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DelVirtual removes an object from the directory listing
|
||||||
|
//
|
||||||
|
// It marks it as removed until it has confirmed the object is missing when the
|
||||||
|
// directory entries are re-read in which case the virtual mark is removed.
|
||||||
|
//
|
||||||
|
// This is used to remove directory entries after things have been deleted or
|
||||||
|
// renamed but before we've had confirmation from the backend.
|
||||||
|
func (d *Dir) DelVirtual(leaf string) {
|
||||||
|
d.delObject(leaf)
|
||||||
|
}
|
||||||
|
|
||||||
// read the directory and sets d.items - must be called with the lock held
|
// read the directory and sets d.items - must be called with the lock held
|
||||||
func (d *Dir) _readDir() error {
|
func (d *Dir) _readDir() error {
|
||||||
when := time.Now()
|
when := time.Now()
|
||||||
|
@ -312,6 +386,21 @@ func (d *Dir) _readDirFromEntries(entries fs.DirEntries, dirTree dirtree.DirTree
|
||||||
}
|
}
|
||||||
node := d.items[name]
|
node := d.items[name]
|
||||||
found[name] = struct{}{}
|
found[name] = struct{}{}
|
||||||
|
virtualState := d.virtual[name]
|
||||||
|
switch virtualState {
|
||||||
|
case vAdd:
|
||||||
|
// item was added to the dir but since it is found in a
|
||||||
|
// listing is no longer virtual
|
||||||
|
delete(d.virtual, name)
|
||||||
|
if len(d.virtual) == 0 {
|
||||||
|
d.virtual = nil
|
||||||
|
}
|
||||||
|
fs.Debugf(d.path, "Removed virtual directory entry %v: %q", virtualState, name)
|
||||||
|
case vDel:
|
||||||
|
// item is deleted from the dir so skip it
|
||||||
|
continue
|
||||||
|
case vOK:
|
||||||
|
}
|
||||||
switch item := entry.(type) {
|
switch item := entry.(type) {
|
||||||
case fs.Object:
|
case fs.Object:
|
||||||
obj := item
|
obj := item
|
||||||
|
@ -349,10 +438,24 @@ func (d *Dir) _readDirFromEntries(entries fs.DirEntries, dirTree dirtree.DirTree
|
||||||
}
|
}
|
||||||
// delete unused entries
|
// delete unused entries
|
||||||
for name := range d.items {
|
for name := range d.items {
|
||||||
if _, ok := found[name]; !ok {
|
if _, ok := found[name]; !ok && d.virtual[name] != vAdd {
|
||||||
|
// item was added to the dir but wasn't found in the
|
||||||
|
// listing - remove it unless it was virtually added
|
||||||
delete(d.items, name)
|
delete(d.items, name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// delete unused virtuals
|
||||||
|
for name, virtualState := range d.virtual {
|
||||||
|
if _, ok := found[name]; !ok && virtualState == vDel {
|
||||||
|
// We have a virtual delete but the item wasn't found in
|
||||||
|
// the listing so no longer needs a virtual delete.
|
||||||
|
delete(d.virtual, name)
|
||||||
|
fs.Debugf(d.path, "Removed virtual directory entry %v: %q", virtualState, name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(d.virtual) == 0 {
|
||||||
|
d.virtual = nil
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/rclone/rclone/fs"
|
"github.com/rclone/rclone/fs"
|
||||||
|
"github.com/rclone/rclone/fs/operations"
|
||||||
"github.com/rclone/rclone/fstest"
|
"github.com/rclone/rclone/fstest"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
@ -277,6 +278,45 @@ func TestDirReadDirAll(t *testing.T) {
|
||||||
dir = node.(*Dir)
|
dir = node.(*Dir)
|
||||||
|
|
||||||
checkListing(t, dir, []string{"file3,16,false"})
|
checkListing(t, dir, []string{"file3,16,false"})
|
||||||
|
|
||||||
|
t.Run("Virtual", func(t *testing.T) {
|
||||||
|
node, err := vfs.Stat("dir")
|
||||||
|
require.NoError(t, err)
|
||||||
|
dir := node.(*Dir)
|
||||||
|
|
||||||
|
// Add some virtual entries and check what happens
|
||||||
|
dir.AddVirtual("virtualFile", 17, false)
|
||||||
|
dir.AddVirtual("virtualDir", 0, true)
|
||||||
|
// Remove some existing entries
|
||||||
|
dir.DelVirtual("file2")
|
||||||
|
dir.DelVirtual("subdir")
|
||||||
|
|
||||||
|
checkListing(t, dir, []string{"file1,14,false", "virtualDir,0,true", "virtualFile,17,false"})
|
||||||
|
|
||||||
|
// Force a directory reload...
|
||||||
|
dir.invalidateDir("dir")
|
||||||
|
|
||||||
|
checkListing(t, dir, []string{"file1,14,false", "virtualDir,0,true", "virtualFile,17,false"})
|
||||||
|
|
||||||
|
// Now action the deletes and uploads
|
||||||
|
_ = r.WriteObject(context.Background(), "dir/virtualFile", "virtualFile contents", t1)
|
||||||
|
_ = r.WriteObject(context.Background(), "dir/virtualDir/testFile", "testFile contents", t1)
|
||||||
|
o, err := r.Fremote.NewObject(context.Background(), "dir/file2")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, o.Remove(context.Background()))
|
||||||
|
require.NoError(t, operations.Purge(context.Background(), r.Fremote, "dir/subdir"))
|
||||||
|
|
||||||
|
// Force a directory reload...
|
||||||
|
dir.invalidateDir("dir")
|
||||||
|
|
||||||
|
checkListing(t, dir, []string{"file1,14,false", "virtualDir,0,true", "virtualFile,20,false"})
|
||||||
|
|
||||||
|
// check no virtuals left
|
||||||
|
dir.mu.Lock()
|
||||||
|
assert.Nil(t, dir.virtual)
|
||||||
|
dir.mu.Unlock()
|
||||||
|
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDirOpen(t *testing.T) {
|
func TestDirOpen(t *testing.T) {
|
||||||
|
|
25
vfs/vstate_string.go
Normal file
25
vfs/vstate_string.go
Normal file
|
@ -0,0 +1,25 @@
|
||||||
|
// Code generated by "stringer -type=vState"; DO NOT EDIT.
|
||||||
|
|
||||||
|
package vfs
|
||||||
|
|
||||||
|
import "strconv"
|
||||||
|
|
||||||
|
func _() {
|
||||||
|
// An "invalid array index" compiler error signifies that the constant values have changed.
|
||||||
|
// Re-run the stringer command to generate them again.
|
||||||
|
var x [1]struct{}
|
||||||
|
_ = x[vOK-0]
|
||||||
|
_ = x[vAdd-1]
|
||||||
|
_ = x[vDel-2]
|
||||||
|
}
|
||||||
|
|
||||||
|
const _vState_name = "vOKvAddvDel"
|
||||||
|
|
||||||
|
var _vState_index = [...]uint8{0, 3, 7, 11}
|
||||||
|
|
||||||
|
func (i vState) String() string {
|
||||||
|
if i >= vState(len(_vState_index)-1) {
|
||||||
|
return "vState(" + strconv.FormatInt(int64(i), 10) + ")"
|
||||||
|
}
|
||||||
|
return _vState_name[_vState_index[i]:_vState_index[i+1]]
|
||||||
|
}
|
Loading…
Reference in a new issue