diff --git a/vfs/file.go b/vfs/file.go index 03d03a761..ee526b937 100644 --- a/vfs/file.go +++ b/vfs/file.go @@ -527,7 +527,8 @@ func (f *File) Sync() error { } // Remove the file -func (f *File) Remove() error { +func (f *File) Remove() (err error) { + defer log.Trace(f.Path(), "")("err=%v", &err) f.mu.RLock() d := f.d f.mu.RUnlock() @@ -535,28 +536,33 @@ func (f *File) Remove() error { if d.vfs.Opt.ReadOnly { return EROFS } - f.muRW.Lock() // muRW must be locked before mu to avoid - f.mu.Lock() // deadlock in RWFileHandle.openPending and .close - if f.o != nil { - err := f.o.Remove(context.TODO()) - if err != nil { - fs.Debugf(f._path(), "File.Remove file error: %v", err) - f.mu.Unlock() - f.muRW.Unlock() - return err - } + + // Remove the object from the cache + wasWriting := false + if d.vfs.cache != nil { + wasWriting = d.vfs.cache.Remove(f.Path()) } - f.mu.Unlock() - f.muRW.Unlock() // Remove the item from the directory listing // called with File.mu released d.delObject(f.Name()) - // Remove the object from the cache - if d.vfs.cache != nil { - d.vfs.cache.Remove(f.Path()) + + f.muRW.Lock() // muRW must be locked before mu to avoid + f.mu.Lock() // deadlock in RWFileHandle.openPending and .close + if f.o != nil { + err = f.o.Remove(context.TODO()) } - return nil + f.mu.Unlock() + f.muRW.Unlock() + if err != nil { + if wasWriting { + // Ignore error deleting file if was writing it as it may not be uploaded yet + err = nil + } else { + fs.Debugf(f._path(), "File.Remove file error: %v", err) + } + } + return err } // RemoveAll the file - same as remove for files diff --git a/vfs/read_write_test.go b/vfs/read_write_test.go index 3bf713289..f7335087c 100644 --- a/vfs/read_write_test.go +++ b/vfs/read_write_test.go @@ -565,6 +565,11 @@ func TestRWFileHandleSizeCreateNew(t *testing.T) { func testRWFileHandleOpenTest(t *testing.T, vfs *VFS, test *openTest) { fileName := "open-test-file" + // Make sure we delete the file on failure too + defer func() { + _ = vfs.Remove(fileName) + }() + // first try with file not existing _, err := vfs.Stat(fileName) require.True(t, os.IsNotExist(err)) diff --git a/vfs/vfscache/cache.go b/vfs/vfscache/cache.go index 863c21050..296b6c4c5 100644 --- a/vfs/vfscache/cache.go +++ b/vfs/vfscache/cache.go @@ -309,14 +309,16 @@ func (c *Cache) Rename(name string, newName string, newObj fs.Object) (err error } // Remove should be called if name is deleted -func (c *Cache) Remove(name string) { +// +// This returns true if the file was in the transfer queue so may not +// have completedly uploaded yet. +func (c *Cache) Remove(name string) (wasWriting bool) { name = clean(name) c.mu.Lock() item, _ := c._get(name) delete(c.item, name) c.mu.Unlock() - item.remove("file deleted") - + return item.remove("file deleted") } // SetModTime should be called to set the modification time of the cache file diff --git a/vfs/vfscache/cache_test.go b/vfs/vfscache/cache_test.go index ed819d3e6..5b7e86a85 100644 --- a/vfs/vfscache/cache_test.go +++ b/vfs/vfscache/cache_test.go @@ -76,7 +76,7 @@ func newTestCache(t *testing.T) (r *fstest.Run, c *Cache, cleanup func()) { // Disable the cache cleaner as it interferes with these tests opt.CachePollInterval = 0 - // Enable synchronous write + // Disable synchronous write opt.WriteBack = 0 return newTestCacheOpt(t, opt) @@ -264,7 +264,7 @@ func TestCachePurgeOld(t *testing.T) { var removed []string removeFile := func(item *Item) { removed = append(removed, item.name) - item._remove("TestCachePurgeOld") + item.remove("TestCachePurgeOld") } removed = nil @@ -326,7 +326,7 @@ func TestCachePurgeOverQuota(t *testing.T) { var removed []string remove := func(item *Item) { removed = append(removed, item.name) - item._remove("TestCachePurgeOverQuota") + item.remove("TestCachePurgeOverQuota") } removed = nil @@ -402,7 +402,6 @@ func TestCachePurgeOverQuota(t *testing.T) { // make potato definitely after potato2 t2 := t1.Add(20 * time.Second) - require.NoError(t, potato.Truncate(5)) potato.info.ATime = t2 // Check only potato2 removed to get below quota @@ -563,13 +562,13 @@ func TestCacheCleaner(t *testing.T) { potato := c.Item("potato") potato2, found := c.get("potato") - assert.Equal(t, potato, potato2) + assert.Equal(t, fmt.Sprintf("%p", potato), fmt.Sprintf("%p", potato2)) assert.True(t, found) time.Sleep(10 * opt.CachePollInterval) potato2, found = c.get("potato") - assert.NotEqual(t, potato, potato2) + assert.NotEqual(t, fmt.Sprintf("%p", potato), fmt.Sprintf("%p", potato2)) assert.False(t, found) } diff --git a/vfs/vfscache/item.go b/vfs/vfscache/item.go index 5112d1416..132dddb9c 100644 --- a/vfs/vfscache/item.go +++ b/vfs/vfscache/item.go @@ -471,18 +471,22 @@ func (item *Item) _store(ctx context.Context, storeFn StoreFn) (err error) { // Transfer the temp file to the remote cacheObj, err := item.c.fcache.NewObject(ctx, item.name) - if err != nil { + if err != nil && err != fs.ErrorObjectNotFound { return errors.Wrap(err, "vfs cache: failed to find cache file") } - item.mu.Unlock() - o, err := operations.Copy(ctx, item.c.fremote, item.o, item.name, cacheObj) - item.mu.Lock() - if err != nil { - return errors.Wrap(err, "vfs cache: failed to transfer file from cache to remote") + // Object has disappeared if cacheObj == nil + if cacheObj != nil { + item.mu.Unlock() + o, err := operations.Copy(ctx, item.c.fremote, item.o, item.name, cacheObj) + item.mu.Lock() + if err != nil { + return errors.Wrap(err, "vfs cache: failed to transfer file from cache to remote") + } + item.o = o + item._updateFingerprint() } - item.o = o - item._updateFingerprint() + item.info.Dirty = false err = item._save() if err != nil { @@ -529,10 +533,12 @@ func (item *Item) Close(storeFn StoreFn) (err error) { } // save the metadata once more since it may be dirty // after the downloader + item.mu.Lock() saveErr := item._save() if saveErr != nil && err == nil { err = errors.Wrap(saveErr, "close failed to save item") } + item.mu.Unlock() }() item.mu.Lock() defer item.mu.Unlock() @@ -582,7 +588,7 @@ func (item *Item) Close(storeFn StoreFn) (err error) { err = item._store(context.Background(), storeFn) } else { // asynchronous writeback - item.c.writeback.add(item, storeFn) + item.c.writeback.add(item, item.name, storeFn) } } @@ -666,10 +672,10 @@ func (item *Item) _removeFile(reason string) { err := os.Remove(osPath) if err != nil { if !os.IsNotExist(err) { - fs.Errorf(item.name, "Failed to remove cache file as %s: %v", reason, err) + fs.Errorf(item.name, "vfs cache: failed to remove cache file as %s: %v", reason, err) } } else { - fs.Infof(item.name, "Removed cache file as %s", reason) + fs.Infof(item.name, "vfs cache: removed cache file as %s", reason) } } @@ -681,28 +687,37 @@ func (item *Item) _removeMeta(reason string) { err := os.Remove(osPathMeta) if err != nil { if !os.IsNotExist(err) { - fs.Errorf(item.name, "Failed to remove metadata from cache as %s: %v", reason, err) + fs.Errorf(item.name, "vfs cache: failed to remove metadata from cache as %s: %v", reason, err) } } else { - fs.Infof(item.name, "Removed metadata from cache as %s", reason) + fs.Infof(item.name, "vfs cache: removed metadata from cache as %s", reason) } } // remove the cached file and empty the metadata // +// This returns true if the file was in the transfer queue so may not +// have completedly uploaded yet. +// // call with lock held -func (item *Item) _remove(reason string) { +func (item *Item) _remove(reason string) (wasWriting bool) { + // Cancel writeback, if any + wasWriting = item.c.writeback.cancel(item) item.info.clean() item.metaDirty = false item._removeFile(reason) item._removeMeta(reason) + return wasWriting } // remove the cached file and empty the metadata -func (item *Item) remove(reason string) { +// +// This returns true if the file was in the transfer queue so may not +// have completedly uploaded yet. +func (item *Item) remove(reason string) (wasWriting bool) { item.mu.Lock() - item._remove(reason) - item.mu.Unlock() + defer item.mu.Unlock() + return item._remove(reason) } // create a downloader for the item diff --git a/vfs/vfscache/writeback.go b/vfs/vfscache/writeback.go index 8573228b5..3d02a8237 100644 --- a/vfs/vfscache/writeback.go +++ b/vfs/vfscache/writeback.go @@ -13,15 +13,14 @@ import ( ) const ( - uploadDelay = 10 * time.Second // delay betwen upload attempts - maxUploadAttempts = 10 // max number of times to try to upload + maxUploadDelay = 5 * time.Minute // max delay betwen upload attempts ) // writeBack keeps track of the items which need to be written back to the disk at some point type writeBack struct { mu sync.Mutex - items writeBackItems // priority queue of *writeBackItem - lookup map[*Item]*writeBackItem // for getting a *writeBackItem from a *Item + items writeBackItems // priority queue of *writeBackItem - writeBackItems are in here while awaiting transfer only + lookup map[*Item]*writeBackItem // for getting a *writeBackItem from a *Item - writeBackItems are in here until cancelled opt *vfscommon.Options // VFS options timer *time.Timer // next scheduled time for the uploader kick chan struct{} // send on this channel to wake up the uploader @@ -47,13 +46,19 @@ func newWriteBack(ctx context.Context, opt *vfscommon.Options) *writeBack { // writeBackItem stores an Item awaiting writeback // +// These are stored on the items heap when awaiting transfer but +// removed from the items heap when transferring. They remain in the +// lookup map until cancelled. +// // writeBack.mu must be held to manipulate this type writeBackItem struct { + name string // name of the item so we don't have to read it from item index int // index into the priority queue for update item *Item // Item that needs writeback expiry time.Time // When this expires we will write it back uploading bool // If we are uploading the item cancel context.CancelFunc // To cancel the upload with + done chan struct{} // closed when the cancellation completes storeFn StoreFn // To write the object back with tries int // number of times we have tried to upload delay time.Duration // delay between upload attempts @@ -114,11 +119,12 @@ func (wb *writeBack) _newExpiry() time.Time { // make a new writeBackItem // // call with the lock held -func (wb *writeBack) _newItem(item *Item) *writeBackItem { +func (wb *writeBack) _newItem(item *Item, name string) *writeBackItem { wbItem := &writeBackItem{ + name: name, item: item, expiry: wb._newExpiry(), - delay: uploadDelay, + delay: wb.opt.WriteBack, } wb._addItem(wbItem) wb._pushItem(wbItem) @@ -153,6 +159,13 @@ func (wb *writeBack) _pushItem(wbItem *writeBackItem) { heap.Push(&wb.items, wbItem) } +// remove a writeBackItem from the items heap +// +// call with the lock held +func (wb *writeBack) _removeItem(wbItem *writeBackItem) { + heap.Remove(&wb.items, wbItem.index) +} + // peek the oldest writeBackItem - may be nil // // call with the lock held @@ -179,13 +192,13 @@ func (wb *writeBack) _resetTimer() { // add adds an item to the writeback queue or resets its timer if it // is already there -func (wb *writeBack) add(item *Item, storeFn StoreFn) { +func (wb *writeBack) add(item *Item, name string, storeFn StoreFn) { wb.mu.Lock() defer wb.mu.Unlock() wbItem, ok := wb.lookup[item] if !ok { - wbItem = wb._newItem(item) + wbItem = wb._newItem(item, name) } else { if wbItem.uploading { // We are uploading already so cancel the upload @@ -198,6 +211,28 @@ func (wb *writeBack) add(item *Item, storeFn StoreFn) { wb._resetTimer() } +// cancel a writeback if there is one +func (wb *writeBack) cancel(item *Item) (found bool) { + wb.mu.Lock() + defer wb.mu.Unlock() + + wbItem, found := wb.lookup[item] + if found { + fs.Debugf(wbItem.name, "vfs cache: cancelling writeback") + if wbItem.uploading { + // We are uploading already so cancel the upload + wb._cancelUpload(wbItem) + } else { + // Remove the item from the heap + wb._removeItem(wbItem) + } + // Remove the item from the lookup map + wb._delItem(wbItem) + } + wb._resetTimer() + return found +} + // kick the upload checker // // This should be called at the end of uploads just in case we had to @@ -229,20 +264,22 @@ func (wb *writeBack) upload(ctx context.Context, wbItem *writeBackItem) { } if err != nil { - if wbItem.tries < maxUploadAttempts { - fs.Errorf(item.getName(), "vfs cache: failed to upload, will retry in %v: %v", wb.opt.WriteBack, err) - // push the item back on the queue for retry - wb._pushItem(wbItem) - wb.items._update(wbItem, time.Now().Add(wbItem.delay)) - wbItem.delay *= 2 - } else { - fs.Errorf(item.getName(), "vfs cache: failed to upload, will retry in %v: %v", wb.opt.WriteBack, err) + // FIXME should this have a max number of transfer attempts? + wbItem.delay *= 2 + if wbItem.delay > maxUploadDelay { + wbItem.delay = maxUploadDelay } + fs.Errorf(wbItem.name, "vfs cache: failed to upload try #%d, will retry in %v: %v", wbItem.tries, wbItem.delay, err) + // push the item back on the queue for retry + wb._pushItem(wbItem) + wb.items._update(wbItem, time.Now().Add(wbItem.delay)) } else { + fs.Infof(wbItem.name, "vfs cache: upload succeeded try #%d", wbItem.tries) // show that we are done with the item wb._delItem(wbItem) } wb._kickUploader() + close(wbItem.done) } // cancel the upload @@ -252,11 +289,12 @@ func (wb *writeBack) _cancelUpload(wbItem *writeBackItem) { if !wbItem.uploading { return } - fs.Debugf(wbItem.item.getName(), "vfs cache: canceling upload") + fs.Debugf(wbItem.name, "vfs cache: cancelling upload") if wbItem.cancel != nil { // Cancel the upload - this may or may not be effective - // we don't wait for the completion wbItem.cancel() + // wait for the uploader to finish + <-wbItem.done } if wbItem.uploading { wbItem.uploading = false @@ -275,7 +313,7 @@ func (wb *writeBack) processItems(ctx context.Context) { for wbItem := wb._peekItem(); wbItem != nil && time.Until(wbItem.expiry) <= 0; wbItem = wb._peekItem() { // If reached transfer limit don't restart the timer if wb.uploads >= fs.Config.Transfers { - fs.Debugf(wbItem.item.getName(), "vfs cache: delaying writeback as --transfers exceeded") + fs.Debugf(wbItem.name, "vfs cache: delaying writeback as --transfers exceeded") resetTimer = false break } @@ -286,6 +324,7 @@ func (wb *writeBack) processItems(ctx context.Context) { wb.uploads++ newCtx, cancel := context.WithCancel(ctx) wbItem.cancel = cancel + wbItem.done = make(chan struct{}) go wb.upload(newCtx, wbItem) }