vfs: factor writeback and downloaders into their own packages

This commit is contained in:
Nick Craig-Wood 2020-06-22 16:31:08 +01:00
parent 79455cc71e
commit ee04732cbb
5 changed files with 231 additions and 178 deletions

View file

@ -20,6 +20,7 @@ import (
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/operations"
"github.com/rclone/rclone/lib/file"
"github.com/rclone/rclone/vfs/vfscache/writeback"
"github.com/rclone/rclone/vfs/vfscommon"
)
@ -35,15 +36,15 @@ import (
// Cache opened files
type Cache struct {
// read only - no locking needed to read these
fremote fs.Fs // fs for the remote we are caching
fcache fs.Fs // fs for the cache directory
fcacheMeta fs.Fs // fs for the cache metadata directory
opt *vfscommon.Options // vfs Options
root string // root of the cache directory
metaRoot string // root of the cache metadata directory
hashType hash.Type // hash to use locally and remotely
hashOption *fs.HashesOption // corresponding OpenOption
writeback *writeBack // holds Items for writeback
fremote fs.Fs // fs for the remote we are caching
fcache fs.Fs // fs for the cache directory
fcacheMeta fs.Fs // fs for the cache metadata directory
opt *vfscommon.Options // vfs Options
root string // root of the cache directory
metaRoot string // root of the cache metadata directory
hashType hash.Type // hash to use locally and remotely
hashOption *fs.HashesOption // corresponding OpenOption
writeback *writeback.WriteBack // holds Items for writeback
mu sync.Mutex // protects the following variables
item map[string]*Item // files/directories in the cache
@ -88,7 +89,7 @@ func New(ctx context.Context, fremote fs.Fs, opt *vfscommon.Options) (*Cache, er
item: make(map[string]*Item),
hashType: hashType,
hashOption: hashOption,
writeback: newWriteBack(ctx, opt),
writeback: writeback.New(ctx, opt),
}
// Make sure cache directories exist
@ -510,7 +511,7 @@ func (c *Cache) clean() {
}
}
c.mu.Unlock()
uploadsInProgress, uploadsQueued := c.writeback.getStats()
uploadsInProgress, uploadsQueued := c.writeback.Stats()
fs.Infof(nil, "Cleaned the cache: objects %d (was %d) in use %d, to upload %d, uploading %d, total size %v (was %v)", newItems, oldItems, totalInUse, uploadsQueued, uploadsInProgress, newUsed, oldUsed)
}

View file

@ -1,4 +1,4 @@
package vfscache
package downloaders
import (
"context"
@ -12,6 +12,7 @@ import (
"github.com/rclone/rclone/fs/chunkedreader"
"github.com/rclone/rclone/fs/log"
"github.com/rclone/rclone/lib/ranges"
"github.com/rclone/rclone/vfs/vfscommon"
)
// FIXME implement max downloaders
@ -27,16 +28,37 @@ const (
maxErrorCount = 10
)
// downloaders is a number of downloader~s and a queue of waiters
// waiting for segments to be downloaded.
type downloaders struct {
// Item is the interface that an item to download must obey
type Item interface {
// FindMissing adjusts r returning a new ranges.Range which only
// contains the range which needs to be downloaded. This could be
// empty - check with IsEmpty. It also adjust this to make sure it is
// not larger than the file.
FindMissing(r ranges.Range) (outr ranges.Range)
// HasRange returns true if the current ranges entirely include range
HasRange(r ranges.Range) bool
// WriteAtNoOverwrite writes b to the file, but will not overwrite
// already present ranges.
//
// This is used by the downloader to write bytes to the file
//
// It returns n the total bytes processed and skipped the number of
// bytes which were processed but not actually written to the file.
WriteAtNoOverwrite(b []byte, off int64) (n int, skipped int, err error)
}
// Downloaders is a number of downloader~s and a queue of waiters
// waiting for segments to be downloaded to a file.
type Downloaders struct {
// Write once - no locking required
ctx context.Context
cancel context.CancelFunc
item *Item
item Item
opt *vfscommon.Options
src fs.Object // source object
remote string
fcache fs.Fs // destination Fs
wg sync.WaitGroup
// Read write
@ -57,7 +79,7 @@ type waiter struct {
// downloader represents a running download for part of a file.
type downloader struct {
// Write once
dls *downloaders // parent structure
dls *Downloaders // parent structure
quit chan struct{} // close to quit the downloader
wg sync.WaitGroup // to keep track of downloader goroutine
kick chan struct{} // kick the downloader when needed
@ -74,18 +96,19 @@ type downloader struct {
stop bool // set to true if we have called _stop()
}
func newDownloaders(item *Item, fcache fs.Fs, remote string, src fs.Object) (dls *downloaders) {
// New makes a downloader for item
func New(item Item, opt *vfscommon.Options, remote string, src fs.Object) (dls *Downloaders) {
if src == nil {
panic("internal error: newDownloaders called with nil src object")
}
ctx, cancel := context.WithCancel(context.Background())
dls = &downloaders{
dls = &Downloaders{
ctx: ctx,
cancel: cancel,
item: item,
opt: opt,
src: src,
remote: remote,
fcache: fcache,
}
dls.wg.Add(1)
go func() {
@ -114,7 +137,7 @@ func newDownloaders(item *Item, fcache fs.Fs, remote string, src fs.Object) (dls
// err is error from download
//
// call with lock held
func (dls *downloaders) _countErrors(n int64, err error) {
func (dls *Downloaders) _countErrors(n int64, err error) {
if err == nil && n != 0 {
if dls.errorCount != 0 {
fs.Infof(dls.src, "Resetting error count to 0")
@ -130,7 +153,7 @@ func (dls *downloaders) _countErrors(n int64, err error) {
}
}
func (dls *downloaders) countErrors(n int64, err error) {
func (dls *Downloaders) countErrors(n int64, err error) {
dls.mu.Lock()
dls._countErrors(n, err)
dls.mu.Unlock()
@ -139,7 +162,7 @@ func (dls *downloaders) countErrors(n int64, err error) {
// Make a new downloader, starting it to download r
//
// call with lock held
func (dls *downloaders) _newDownloader(r ranges.Range) (dl *downloader, err error) {
func (dls *Downloaders) _newDownloader(r ranges.Range) (dl *downloader, err error) {
defer log.Trace(dls.src, "r=%v", r)("err=%v", &err)
dl = &downloader{
@ -180,7 +203,7 @@ func (dls *downloaders) _newDownloader(r ranges.Range) (dl *downloader, err erro
// _removeClosed() removes any downloaders which are closed.
//
// Call with the mutex held
func (dls *downloaders) _removeClosed() {
func (dls *Downloaders) _removeClosed() {
newDownloaders := dls.dls[:0]
for _, dl := range dls.dls {
if !dl.closed() {
@ -192,7 +215,7 @@ func (dls *downloaders) _removeClosed() {
// Close all running downloaders and return any unfulfilled waiters
// with inErr
func (dls *downloaders) close(inErr error) (err error) {
func (dls *Downloaders) Close(inErr error) (err error) {
dls.mu.Lock()
defer dls.mu.Unlock()
dls._removeClosed()
@ -212,8 +235,9 @@ func (dls *downloaders) close(inErr error) (err error) {
return err
}
// Ensure a downloader is running to download r
func (dls *downloaders) ensure(r ranges.Range) (err error) {
// Download the range passed in returning when it has been downloaded
// with an error from the downloading go routine.
func (dls *Downloaders) Download(r ranges.Range) (err error) {
defer log.Trace(dls.src, "r=%+v", r)("err=%v", &err)
dls.mu.Lock()
@ -238,7 +262,7 @@ func (dls *downloaders) ensure(r ranges.Range) (err error) {
// close any waiters with the error passed in
//
// call with lock held
func (dls *downloaders) _closeWaiters(err error) {
func (dls *Downloaders) _closeWaiters(err error) {
for _, waiter := range dls.waiters {
waiter.errChan <- err
}
@ -249,7 +273,7 @@ func (dls *downloaders) _closeWaiters(err error) {
// then it starts it.
//
// call with lock held
func (dls *downloaders) _ensureDownloader(r ranges.Range) (err error) {
func (dls *Downloaders) _ensureDownloader(r ranges.Range) (err error) {
// FIXME this window could be a different config var?
window := int64(fs.Config.BufferSize)
@ -258,7 +282,7 @@ func (dls *downloaders) _ensureDownloader(r ranges.Range) (err error) {
// read some stuff already.
//
// Clip r to stuff which needs downloading
r = dls.item.findMissing(r)
r = dls.item.FindMissing(r)
// If the range is entirely present then we only need to start a
// dowloader if the window isn't full.
@ -269,7 +293,7 @@ func (dls *downloaders) _ensureDownloader(r ranges.Range) (err error) {
rWindow.Size = window
}
// Clip rWindow to stuff which needs downloading
rWindow = dls.item.findMissing(rWindow)
rWindow = dls.item.FindMissing(rWindow)
// If rWindow is empty then just return without starting a
// downloader as there is no data within the window which needs
// downloading.
@ -310,9 +334,11 @@ func (dls *downloaders) _ensureDownloader(r ranges.Range) (err error) {
return err
}
// ensure a downloader is running for offset if required. If one
// isn't found then it starts it
func (dls *downloaders) ensureDownloader(r ranges.Range) (err error) {
// EnsureDownloader makes sure a downloader is running for the range
// passed in. If one isn't found then it starts it.
//
// It does not wait for the range to be downloaded
func (dls *Downloaders) EnsureDownloader(r ranges.Range) (err error) {
dls.mu.Lock()
defer dls.mu.Unlock()
return dls._ensureDownloader(r)
@ -322,14 +348,14 @@ func (dls *downloaders) ensureDownloader(r ranges.Range) (err error) {
// their callers.
//
// Call with the mutex held
func (dls *downloaders) _dispatchWaiters() {
func (dls *Downloaders) _dispatchWaiters() {
if len(dls.waiters) == 0 {
return
}
newWaiters := dls.waiters[:0]
for _, waiter := range dls.waiters {
if dls.item.hasRange(waiter.r) {
if dls.item.HasRange(waiter.r) {
waiter.errChan <- nil
} else {
newWaiters = append(newWaiters, waiter)
@ -340,7 +366,7 @@ func (dls *downloaders) _dispatchWaiters() {
// Send any waiters which have completed back to their callers and make sure
// there is a downloader appropriate for each waiter
func (dls *downloaders) kickWaiters() (err error) {
func (dls *Downloaders) kickWaiters() (err error) {
dls.mu.Lock()
defer dls.mu.Unlock()
@ -351,7 +377,7 @@ func (dls *downloaders) kickWaiters() (err error) {
}
// Make sure each waiter has a downloader
// This is an O(waiters*downloaders) algorithm
// This is an O(waiters*Downloaders) algorithm
// However the number of waiters and the number of downloaders
// are both expected to be small.
for _, waiter := range dls.waiters {
@ -421,7 +447,7 @@ func (dl *downloader) Write(p []byte) (n int, err error) {
}
}
n, skipped, err := dl.dls.item.writeAtNoOverwrite(p, dl.offset)
n, skipped, err := dl.dls.item.WriteAtNoOverwrite(p, dl.offset)
if skipped == n {
dl.skipped += int64(skipped)
} else {
@ -457,7 +483,7 @@ func (dl *downloader) open(offset int64) (err error) {
// }
// in0, err := operations.NewReOpen(dl.dls.ctx, dl.dls.src, fs.Config.LowLevelRetries, dl.dls.item.c.hashOption, rangeOption)
in0 := chunkedreader.New(context.TODO(), dl.dls.src, int64(dl.dls.item.c.opt.ChunkSize), int64(dl.dls.item.c.opt.ChunkSizeLimit))
in0 := chunkedreader.New(context.TODO(), dl.dls.src, int64(dl.dls.opt.ChunkSize), int64(dl.dls.opt.ChunkSizeLimit))
_, err = in0.Seek(offset, 0)
if err != nil {
return errors.Wrap(err, "vfs reader: failed to open source file")

View file

@ -15,6 +15,8 @@ import (
"github.com/rclone/rclone/fs/operations"
"github.com/rclone/rclone/lib/file"
"github.com/rclone/rclone/lib/ranges"
"github.com/rclone/rclone/vfs/vfscache/downloaders"
"github.com/rclone/rclone/vfs/vfscache/writeback"
)
// NB as Cache and Item are tightly linked it is necessary to have a
@ -47,15 +49,16 @@ type Item struct {
// read only
c *Cache // cache this is part of
mu sync.Mutex // protect the variables
name string // name in the VFS
opens int // number of times file is open
downloaders *downloaders // a record of the downloaders in action - may be nil
o fs.Object // object we are caching - may be nil
fd *os.File // handle we are using to read and write to the file
metaDirty bool // set if the info needs writeback
modified bool // set if the file has been modified since the last Open
info Info // info about the file to persist to backing store
mu sync.Mutex // protect the variables
name string // name in the VFS
opens int // number of times file is open
downloaders *downloaders.Downloaders // a record of the downloaders in action - may be nil
o fs.Object // object we are caching - may be nil
fd *os.File // handle we are using to read and write to the file
metaDirty bool // set if the info needs writeback
modified bool // set if the file has been modified since the last Open
info Info // info about the file to persist to backing store
writeBackID writeback.Handle // id of any writebacks in progress
}
@ -370,7 +373,7 @@ func (item *Item) _dirty() {
if !item.modified {
item.modified = true
item.mu.Unlock()
item.c.writeback.remove(item)
item.c.writeback.Remove(item.writeBackID)
item.mu.Lock()
}
if !item.info.Dirty {
@ -464,7 +467,7 @@ func (item *Item) Open(o fs.Object) (err error) {
// Create the downloaders
if item.o != nil {
item.downloaders = newDownloaders(item, item.c.fremote, item.name, item.o)
item.downloaders = downloaders.New(item, item.c.opt, item.name, item.o)
}
return err
@ -523,7 +526,7 @@ func (item *Item) store(ctx context.Context, storeFn StoreFn) (err error) {
func (item *Item) Close(storeFn StoreFn) (err error) {
defer log.Trace(item.o, "Item.Close")("err=%v", &err)
var (
downloaders *downloaders
downloaders *downloaders.Downloaders
syncWriteBack = item.c.opt.WriteBack <= 0
)
item.mu.Lock()
@ -575,7 +578,7 @@ func (item *Item) Close(storeFn StoreFn) (err error) {
// downloader.Write calls ensure which needs the lock
// close downloader with mutex unlocked
item.mu.Unlock()
checkErr(downloaders.close(nil))
checkErr(downloaders.Close(nil))
item.mu.Lock()
}
@ -609,8 +612,10 @@ func (item *Item) Close(storeFn StoreFn) (err error) {
checkErr(item._store(context.Background(), storeFn))
} else {
// asynchronous writeback
item.c.writeback.SetID(&item.writeBackID)
id := item.writeBackID
item.mu.Unlock()
item.c.writeback.add(item, item.name, item.modified, func(ctx context.Context) error {
item.c.writeback.Add(id, item.name, item.modified, func(ctx context.Context) error {
return item.store(ctx, storeFn)
})
item.mu.Lock()
@ -733,7 +738,7 @@ func (item *Item) _removeMeta(reason string) {
func (item *Item) _remove(reason string) (wasWriting bool) {
// Cancel writeback, if any
item.mu.Unlock()
wasWriting = item.c.writeback.remove(item)
wasWriting = item.c.writeback.Remove(item.writeBackID)
item.mu.Lock()
item.info.clean()
item.metaDirty = false
@ -766,18 +771,18 @@ func (item *Item) present() bool {
return item._present()
}
// hasRange returns true if the current ranges entirely include range
func (item *Item) hasRange(r ranges.Range) bool {
// HasRange returns true if the current ranges entirely include range
func (item *Item) HasRange(r ranges.Range) bool {
item.mu.Lock()
defer item.mu.Unlock()
return item.info.Rs.Present(r)
}
// findMissing adjusts r returning a new ranges.Range which only
// FindMissing adjusts r returning a new ranges.Range which only
// contains the range which needs to be downloaded. This could be
// empty - check with IsEmpty. It also adjust this to make sure it is
// not larger than the file.
func (item *Item) findMissing(r ranges.Range) (outr ranges.Range) {
func (item *Item) FindMissing(r ranges.Range) (outr ranges.Range) {
item.mu.Lock()
defer item.mu.Unlock()
outr = item.info.Rs.FindMissing(r)
@ -805,12 +810,12 @@ func (item *Item) _ensure(offset, size int64) (err error) {
return nil
}
// Otherwise start the downloader for the future if required
return item.downloaders.ensureDownloader(r)
return item.downloaders.EnsureDownloader(r)
}
if item.downloaders == nil {
return errors.New("internal error: downloaders is nil")
}
return item.downloaders.ensure(r)
return item.downloaders.Download(r)
}
// _written marks the (offset, size) as present in the backing file
@ -925,14 +930,14 @@ func (item *Item) WriteAt(b []byte, off int64) (n int, err error) {
return n, err
}
// writeAtNoOverwrite writes b to the file, but will not overwrite
// WriteAtNoOverwrite writes b to the file, but will not overwrite
// already present ranges.
//
// This is used by the downloader to write bytes to the file
//
// It returns n the total bytes processed and skipped the number of
// bytes which were processed but not actually written to the file.
func (item *Item) writeAtNoOverwrite(b []byte, off int64) (n int, skipped int, err error) {
func (item *Item) WriteAtNoOverwrite(b []byte, off int64) (n int, skipped int, err error) {
item.mu.Lock()
var (
@ -1002,11 +1007,11 @@ func (item *Item) Sync() (err error) {
// rename the item
func (item *Item) rename(name string, newName string, newObj fs.Object) (err error) {
var downloaders *downloaders
var downloaders *downloaders.Downloaders
// close downloader with mutex unlocked
defer func() {
if downloaders != nil {
_ = downloaders.close(nil)
_ = downloaders.Close(nil)
}
}()

View file

@ -1,11 +1,12 @@
// This keeps track of the files which need to be written back
package vfscache
// Package writeback keeps track of the files which need to be written
// back to storage
package writeback
import (
"container/heap"
"context"
"sync"
"sync/atomic"
"time"
"github.com/rclone/rclone/fs"
@ -17,30 +18,35 @@ const (
maxUploadDelay = 5 * time.Minute // max delay betwen upload attempts
)
// putFn is the interface that item provides to store the data
type putFn func(context.Context) error
// PutFn is the interface that item provides to store the data
type PutFn func(context.Context) error
// writeBack keeps track of the items which need to be written back to the disk at some point
type writeBack struct {
// Handle is returned for callers to keep track of writeback items
type Handle uint64
// WriteBack keeps track of the items which need to be written back to the disk at some point
type WriteBack struct {
ctx context.Context
mu sync.Mutex
items writeBackItems // priority queue of *writeBackItem - writeBackItems are in here while awaiting transfer only
lookup map[*Item]*writeBackItem // for getting a *writeBackItem from a *Item - writeBackItems are in here until cancelled
opt *vfscommon.Options // VFS options
timer *time.Timer // next scheduled time for the uploader
expiry time.Time // time the next item exires or IsZero
uploads int // number of uploads in progress
id uint64 // id of the last writeBackItem created
items writeBackItems // priority queue of *writeBackItem - writeBackItems are in here while awaiting transfer only
lookup map[Handle]*writeBackItem // for getting a *writeBackItem from a Handle - writeBackItems are in here until cancelled
opt *vfscommon.Options // VFS options
timer *time.Timer // next scheduled time for the uploader
expiry time.Time // time the next item exires or IsZero
uploads int // number of uploads in progress
// read and written with atomic
id Handle // id of the last writeBackItem created
}
// make a new writeBack
// New make a new WriteBack
//
// cancel the context to stop the background goroutine
func newWriteBack(ctx context.Context, opt *vfscommon.Options) *writeBack {
wb := &writeBack{
// cancel the context to stop the background processing
func New(ctx context.Context, opt *vfscommon.Options) *WriteBack {
wb := &WriteBack{
ctx: ctx,
items: writeBackItems{},
lookup: make(map[*Item]*writeBackItem),
lookup: make(map[Handle]*writeBackItem),
opt: opt,
}
heap.Init(&wb.items)
@ -56,15 +62,14 @@ func newWriteBack(ctx context.Context, opt *vfscommon.Options) *writeBack {
// writeBack.mu must be held to manipulate this
type writeBackItem struct {
name string // name of the item so we don't have to read it from item
id uint64 // id of the item
id Handle // id of the item
index int // index into the priority queue for update
item *Item // Item that needs writeback
expiry time.Time // When this expires we will write it back
uploading bool // True if item is being processed by upload() method
onHeap bool // true if this item is on the items heap
cancel context.CancelFunc // To cancel the upload with
done chan struct{} // closed when the cancellation completes
putFn putFn // To write the object data
putFn PutFn // To write the object data
tries int // number of times we have tried to upload
delay time.Duration // delay between upload attempts
}
@ -118,7 +123,7 @@ func (ws *writeBackItems) _update(item *writeBackItem, expiry time.Time) {
// return a new expiry time based from now until the WriteBack timeout
//
// call with lock held
func (wb *writeBack) _newExpiry() time.Time {
func (wb *WriteBack) _newExpiry() time.Time {
expiry := time.Now()
if wb.opt.WriteBack > 0 {
expiry = expiry.Add(wb.opt.WriteBack)
@ -130,14 +135,13 @@ func (wb *writeBack) _newExpiry() time.Time {
// make a new writeBackItem
//
// call with the lock held
func (wb *writeBack) _newItem(item *Item, name string) *writeBackItem {
wb.id++
func (wb *WriteBack) _newItem(id Handle, name string) *writeBackItem {
wb.SetID(&id)
wbItem := &writeBackItem{
name: name,
item: item,
expiry: wb._newExpiry(),
delay: wb.opt.WriteBack,
id: wb.id,
id: id,
}
wb._addItem(wbItem)
wb._pushItem(wbItem)
@ -147,21 +151,21 @@ func (wb *writeBack) _newItem(item *Item, name string) *writeBackItem {
// add a writeBackItem to the lookup map
//
// call with the lock held
func (wb *writeBack) _addItem(wbItem *writeBackItem) {
wb.lookup[wbItem.item] = wbItem
func (wb *WriteBack) _addItem(wbItem *writeBackItem) {
wb.lookup[wbItem.id] = wbItem
}
// delete a writeBackItem from the lookup map
//
// call with the lock held
func (wb *writeBack) _delItem(wbItem *writeBackItem) {
delete(wb.lookup, wbItem.item)
func (wb *WriteBack) _delItem(wbItem *writeBackItem) {
delete(wb.lookup, wbItem.id)
}
// pop a writeBackItem from the items heap
//
// call with the lock held
func (wb *writeBack) _popItem() (wbItem *writeBackItem) {
func (wb *WriteBack) _popItem() (wbItem *writeBackItem) {
wbItem = heap.Pop(&wb.items).(*writeBackItem)
wbItem.onHeap = false
return wbItem
@ -170,7 +174,7 @@ func (wb *writeBack) _popItem() (wbItem *writeBackItem) {
// push a writeBackItem onto the items heap
//
// call with the lock held
func (wb *writeBack) _pushItem(wbItem *writeBackItem) {
func (wb *WriteBack) _pushItem(wbItem *writeBackItem) {
if !wbItem.onHeap {
heap.Push(&wb.items, wbItem)
wbItem.onHeap = true
@ -180,7 +184,7 @@ func (wb *writeBack) _pushItem(wbItem *writeBackItem) {
// remove a writeBackItem from the items heap
//
// call with the lock held
func (wb *writeBack) _removeItem(wbItem *writeBackItem) {
func (wb *WriteBack) _removeItem(wbItem *writeBackItem) {
if wbItem.onHeap {
heap.Remove(&wb.items, wbItem.index)
wbItem.onHeap = false
@ -190,7 +194,7 @@ func (wb *writeBack) _removeItem(wbItem *writeBackItem) {
// peek the oldest writeBackItem - may be nil
//
// call with the lock held
func (wb *writeBack) _peekItem() (wbItem *writeBackItem) {
func (wb *WriteBack) _peekItem() (wbItem *writeBackItem) {
if len(wb.items) == 0 {
return nil
}
@ -198,7 +202,7 @@ func (wb *writeBack) _peekItem() (wbItem *writeBackItem) {
}
// stop the timer which runs the expiries
func (wb *writeBack) _stopTimer() {
func (wb *WriteBack) _stopTimer() {
if wb.expiry.IsZero() {
return
}
@ -211,7 +215,7 @@ func (wb *writeBack) _stopTimer() {
}
// reset the timer which runs the expiries
func (wb *writeBack) _resetTimer() {
func (wb *WriteBack) _resetTimer() {
wbItem := wb._peekItem()
if wbItem == nil {
wb._stopTimer()
@ -234,17 +238,31 @@ func (wb *writeBack) _resetTimer() {
}
}
// add adds an item to the writeback queue or resets its timer if it
// SetID sets the Handle pointed to if it is non zero to the next
// handle.
func (wb *WriteBack) SetID(pid *Handle) {
if *pid == 0 {
*pid = Handle(atomic.AddUint64((*uint64)(&wb.id), 1))
}
}
// Add adds an item to the writeback queue or resets its timer if it
// is already there.
//
// if modified is false then it it doesn't a pending upload
func (wb *writeBack) add(item *Item, name string, modified bool, putFn putFn) *writeBackItem {
// If id is 0 then a new item will always be created and the new
// Handle will be returned.
//
// Use SetID to create Handles in advance of calling Add
//
// If modified is false then it it doesn't cancel a pending upload if
// there is one as there is no need.
func (wb *WriteBack) Add(id Handle, name string, modified bool, putFn PutFn) Handle {
wb.mu.Lock()
defer wb.mu.Unlock()
wbItem, ok := wb.lookup[item]
wbItem, ok := wb.lookup[id]
if !ok {
wbItem = wb._newItem(item, name)
wbItem = wb._newItem(id, name)
} else {
if wbItem.uploading && modified {
// We are uploading already so cancel the upload
@ -255,18 +273,19 @@ func (wb *writeBack) add(item *Item, name string, modified bool, putFn putFn) *w
}
wbItem.putFn = putFn
wb._resetTimer()
return wbItem
return wbItem.id
}
// Call when a file is removed. This cancels a writeback if there is
// one and doesn't return the item to the queue.
func (wb *writeBack) remove(item *Item) (found bool) {
// Remove should be called when a file should be removed from the
// writeback queue. This cancels a writeback if there is one and
// doesn't return the item to the queue.
func (wb *WriteBack) Remove(id Handle) (found bool) {
wb.mu.Lock()
defer wb.mu.Unlock()
wbItem, found := wb.lookup[item]
wbItem, found := wb.lookup[id]
if found {
fs.Debugf(wbItem.name, "vfs cache: cancelling writeback (uploading %v) %p item %p", wbItem.uploading, wbItem, wbItem.item)
fs.Debugf(wbItem.name, "vfs cache: cancelling writeback (uploading %v) %p item %d", wbItem.uploading, wbItem, wbItem.id)
if wbItem.uploading {
// We are uploading already so cancel the upload
wb._cancelUpload(wbItem)
@ -283,7 +302,7 @@ func (wb *writeBack) remove(item *Item) (found bool) {
// upload the item - called as a goroutine
//
// uploading will have been incremented here already
func (wb *writeBack) upload(ctx context.Context, wbItem *writeBackItem) {
func (wb *WriteBack) upload(ctx context.Context, wbItem *writeBackItem) {
wb.mu.Lock()
defer wb.mu.Unlock()
putFn := wbItem.putFn
@ -327,7 +346,7 @@ func (wb *writeBack) upload(ctx context.Context, wbItem *writeBackItem) {
// cancel the upload - the item should be on the heap after this returns
//
// call with lock held
func (wb *writeBack) _cancelUpload(wbItem *writeBackItem) {
func (wb *WriteBack) _cancelUpload(wbItem *writeBackItem) {
if !wbItem.uploading {
return
}
@ -351,10 +370,10 @@ func (wb *writeBack) _cancelUpload(wbItem *writeBackItem) {
// cancelUpload cancels the upload of the item if there is one in progress
//
// it returns true if there was an upload in progress
func (wb *writeBack) cancelUpload(item *Item) bool {
func (wb *WriteBack) cancelUpload(id Handle) bool {
wb.mu.Lock()
defer wb.mu.Unlock()
wbItem, ok := wb.lookup[item]
wbItem, ok := wb.lookup[id]
if !ok || !wbItem.uploading {
return false
}
@ -363,7 +382,7 @@ func (wb *writeBack) cancelUpload(item *Item) bool {
}
// this uploads as many items as possible
func (wb *writeBack) processItems(ctx context.Context) {
func (wb *WriteBack) processItems(ctx context.Context) {
wb.mu.Lock()
defer wb.mu.Unlock()
@ -397,8 +416,8 @@ func (wb *writeBack) processItems(ctx context.Context) {
}
}
// return the number of uploads in progress
func (wb *writeBack) getStats() (uploadsInProgress, uploadsQueued int) {
// Stats return the number of uploads in progress and queued
func (wb *WriteBack) Stats() (uploadsInProgress, uploadsQueued int) {
wb.mu.Lock()
defer wb.mu.Unlock()
return wb.uploads, len(wb.items)

View file

@ -1,4 +1,4 @@
package vfscache
package writeback
import (
"container/heap"
@ -15,16 +15,16 @@ import (
"github.com/stretchr/testify/assert"
)
func newTestWriteBack(t *testing.T) (wb *writeBack, cancel func()) {
func newTestWriteBack(t *testing.T) (wb *WriteBack, cancel func()) {
ctx, cancel := context.WithCancel(context.Background())
opt := vfscommon.DefaultOpt
opt.WriteBack = 100 * time.Millisecond
wb = newWriteBack(ctx, &opt)
wb = New(ctx, &opt)
return wb, cancel
}
// string for debugging - make a copy and pop the items out in order
func (wb *writeBack) string(t *testing.T) string {
func (wb *WriteBack) string(t *testing.T) string {
wb.mu.Lock()
defer wb.mu.Unlock()
ws := wb.items
@ -54,7 +54,7 @@ func TestWriteBackItems(t *testing.T) {
wbItem2 := writeBackItem{name: "two", expiry: now.Add(2 * time.Second)}
wbItem3 := writeBackItem{name: "three", expiry: now.Add(4 * time.Second)}
wb := &writeBack{
wb := &WriteBack{
items: writeBackItems{},
}
@ -80,7 +80,7 @@ func TestWriteBackItems(t *testing.T) {
assert.Equal(t, "one,two,three", wb.string(t))
}
func checkOnHeap(t *testing.T, wb *writeBack, wbItem *writeBackItem) {
func checkOnHeap(t *testing.T, wb *WriteBack, wbItem *writeBackItem) {
wb.mu.Lock()
defer wb.mu.Unlock()
assert.True(t, wbItem.onHeap)
@ -92,7 +92,7 @@ func checkOnHeap(t *testing.T, wb *writeBack, wbItem *writeBackItem) {
assert.Failf(t, "expecting %q on heap", wbItem.name)
}
func checkNotOnHeap(t *testing.T, wb *writeBack, wbItem *writeBackItem) {
func checkNotOnHeap(t *testing.T, wb *WriteBack, wbItem *writeBackItem) {
wb.mu.Lock()
defer wb.mu.Unlock()
assert.False(t, wbItem.onHeap)
@ -103,35 +103,34 @@ func checkNotOnHeap(t *testing.T, wb *writeBack, wbItem *writeBackItem) {
}
}
func checkInLookup(t *testing.T, wb *writeBack, wbItem *writeBackItem) {
func checkInLookup(t *testing.T, wb *WriteBack, wbItem *writeBackItem) {
wb.mu.Lock()
defer wb.mu.Unlock()
assert.Equal(t, wbItem, wb.lookup[wbItem.item])
assert.Equal(t, wbItem, wb.lookup[wbItem.id])
}
func checkNotInLookup(t *testing.T, wb *writeBack, wbItem *writeBackItem) {
func checkNotInLookup(t *testing.T, wb *WriteBack, wbItem *writeBackItem) {
wb.mu.Lock()
defer wb.mu.Unlock()
assert.Nil(t, wb.lookup[wbItem.item])
assert.Nil(t, wb.lookup[wbItem.id])
}
func TestWriteBackItemCRUD(t *testing.T) {
wb, cancel := newTestWriteBack(t)
defer cancel()
item1, item2, item3 := &Item{}, &Item{}, &Item{}
// _peekItem empty
assert.Nil(t, wb._peekItem())
wbItem1 := wb._newItem(item1, "one")
wbItem1 := wb._newItem(0, "one")
checkOnHeap(t, wb, wbItem1)
checkInLookup(t, wb, wbItem1)
wbItem2 := wb._newItem(item2, "two")
wbItem2 := wb._newItem(0, "two")
checkOnHeap(t, wb, wbItem2)
checkInLookup(t, wb, wbItem2)
wbItem3 := wb._newItem(item3, "three")
wbItem3 := wb._newItem(0, "three")
checkOnHeap(t, wb, wbItem3)
checkInLookup(t, wb, wbItem3)
@ -180,7 +179,7 @@ func TestWriteBackItemCRUD(t *testing.T) {
assert.Equal(t, "one,three", wb.string(t))
}
func assertTimerRunning(t *testing.T, wb *writeBack, running bool) {
func assertTimerRunning(t *testing.T, wb *WriteBack, running bool) {
wb.mu.Lock()
if running {
assert.NotEqual(t, time.Time{}, wb.expiry)
@ -202,7 +201,7 @@ func TestWriteBackResetTimer(t *testing.T) {
// Check timer is stopped
assertTimerRunning(t, wb, false)
_ = wb._newItem(&Item{}, "three")
_ = wb._newItem(0, "three")
// Reset the timer on an queue with stuff
wb._resetTimer()
@ -230,7 +229,7 @@ func newPutItem(t *testing.T) *putItem {
}
}
// put the object as per putFn interface
// put the object as per PutFn interface
func (pi *putItem) put(ctx context.Context) (err error) {
pi.wg.Add(1)
defer pi.wg.Done()
@ -274,7 +273,7 @@ func (pi *putItem) finish(err error) {
pi.wg.Wait()
}
func waitUntilNoTransfers(t *testing.T, wb *writeBack) {
func waitUntilNoTransfers(t *testing.T, wb *WriteBack) {
for i := 0; i < 100; i++ {
wb.mu.Lock()
uploads := wb.uploads
@ -292,10 +291,15 @@ func TestWriteBackAddOK(t *testing.T) {
wb, cancel := newTestWriteBack(t)
defer cancel()
item := &Item{}
pi := newPutItem(t)
wbItem := wb.add(item, "one", true, pi.put)
var inID Handle
wb.SetID(&inID)
assert.Equal(t, Handle(1), inID)
id := wb.Add(inID, "one", true, pi.put)
assert.Equal(t, inID, id)
wbItem := wb.lookup[id]
checkOnHeap(t, wb, wbItem)
checkInLookup(t, wb, wbItem)
assert.Equal(t, "one", wb.string(t))
@ -315,10 +319,10 @@ func TestWriteBackAddFailRetry(t *testing.T) {
wb, cancel := newTestWriteBack(t)
defer cancel()
item := &Item{}
pi := newPutItem(t)
wbItem := wb.add(item, "one", true, pi.put)
id := wb.Add(0, "one", true, pi.put)
wbItem := wb.lookup[id]
checkOnHeap(t, wb, wbItem)
checkInLookup(t, wb, wbItem)
assert.Equal(t, "one", wb.string(t))
@ -348,10 +352,10 @@ func TestWriteBackAddUpdate(t *testing.T) {
wb, cancel := newTestWriteBack(t)
defer cancel()
item := &Item{}
pi := newPutItem(t)
wbItem := wb.add(item, "one", true, pi.put)
id := wb.Add(0, "one", true, pi.put)
wbItem := wb.lookup[id]
checkOnHeap(t, wb, wbItem)
checkInLookup(t, wb, wbItem)
assert.Equal(t, "one", wb.string(t))
@ -363,8 +367,8 @@ func TestWriteBackAddUpdate(t *testing.T) {
// Now the upload has started add another one
pi2 := newPutItem(t)
wbItem2 := wb.add(item, "one", true, pi2.put)
assert.Equal(t, wbItem, wbItem2)
id2 := wb.Add(id, "one", true, pi2.put)
assert.Equal(t, id, id2)
checkOnHeap(t, wb, wbItem) // object awaiting writeback time
checkInLookup(t, wb, wbItem)
@ -387,10 +391,10 @@ func TestWriteBackAddUpdateNotModified(t *testing.T) {
wb, cancel := newTestWriteBack(t)
defer cancel()
item := &Item{}
pi := newPutItem(t)
wbItem := wb.add(item, "one", false, pi.put)
id := wb.Add(0, "one", false, pi.put)
wbItem := wb.lookup[id]
checkOnHeap(t, wb, wbItem)
checkInLookup(t, wb, wbItem)
assert.Equal(t, "one", wb.string(t))
@ -402,8 +406,8 @@ func TestWriteBackAddUpdateNotModified(t *testing.T) {
// Now the upload has started add another one
pi2 := newPutItem(t)
wbItem2 := wb.add(item, "one", false, pi2.put)
assert.Equal(t, wbItem, wbItem2)
id2 := wb.Add(id, "one", false, pi2.put)
assert.Equal(t, id, id2)
checkNotOnHeap(t, wb, wbItem) // object still being transfered
checkInLookup(t, wb, wbItem)
@ -426,10 +430,10 @@ func TestWriteBackAddUpdateNotStarted(t *testing.T) {
wb, cancel := newTestWriteBack(t)
defer cancel()
item := &Item{}
pi := newPutItem(t)
wbItem := wb.add(item, "one", true, pi.put)
id := wb.Add(0, "one", true, pi.put)
wbItem := wb.lookup[id]
checkOnHeap(t, wb, wbItem)
checkInLookup(t, wb, wbItem)
assert.Equal(t, "one", wb.string(t))
@ -437,8 +441,8 @@ func TestWriteBackAddUpdateNotStarted(t *testing.T) {
// Immediately add another upload before the first has started
pi2 := newPutItem(t)
wbItem2 := wb.add(item, "one", true, pi2.put)
assert.Equal(t, wbItem, wbItem2)
id2 := wb.Add(id, "one", true, pi2.put)
assert.Equal(t, id, id2)
checkOnHeap(t, wb, wbItem) // object still awaiting transfer
checkInLookup(t, wb, wbItem)
@ -464,25 +468,24 @@ func TestWriteBackGetStats(t *testing.T) {
wb, cancel := newTestWriteBack(t)
defer cancel()
item := &Item{}
pi := newPutItem(t)
wb.add(item, "one", true, pi.put)
wb.Add(0, "one", true, pi.put)
inProgress, queued := wb.getStats()
inProgress, queued := wb.Stats()
assert.Equal(t, queued, 1)
assert.Equal(t, inProgress, 0)
<-pi.started
inProgress, queued = wb.getStats()
inProgress, queued = wb.Stats()
assert.Equal(t, queued, 0)
assert.Equal(t, inProgress, 1)
pi.finish(nil) // transfer successful
waitUntilNoTransfers(t, wb)
inProgress, queued = wb.getStats()
inProgress, queued = wb.Stats()
assert.Equal(t, queued, 0)
assert.Equal(t, inProgress, 0)
@ -501,10 +504,10 @@ func TestWriteBackMaxQueue(t *testing.T) {
for i := 0; i < toTransfer; i++ {
pi := newPutItem(t)
pis = append(pis, pi)
wb.add(&Item{}, fmt.Sprintf("number%d", 1), true, pi.put)
wb.Add(0, fmt.Sprintf("number%d", 1), true, pi.put)
}
inProgress, queued := wb.getStats()
inProgress, queued := wb.Stats()
assert.Equal(t, toTransfer, queued)
assert.Equal(t, 0, inProgress)
@ -516,7 +519,7 @@ func TestWriteBackMaxQueue(t *testing.T) {
// timer should be stopped now
assertTimerRunning(t, wb, false)
inProgress, queued = wb.getStats()
inProgress, queued = wb.Stats()
assert.Equal(t, toTransfer-maxTransfers, queued)
assert.Equal(t, maxTransfers, inProgress)
@ -532,7 +535,7 @@ func TestWriteBackMaxQueue(t *testing.T) {
}
waitUntilNoTransfers(t, wb)
inProgress, queued = wb.getStats()
inProgress, queued = wb.Stats()
assert.Equal(t, queued, 0)
assert.Equal(t, inProgress, 0)
}
@ -541,26 +544,26 @@ func TestWriteBackRemove(t *testing.T) {
wb, cancel := newTestWriteBack(t)
defer cancel()
item := &Item{}
// cancel when not in writeback
assert.False(t, wb.remove(item))
assert.False(t, wb.Remove(1))
// add item
pi1 := newPutItem(t)
wbItem := wb.add(item, "one", true, pi1.put)
id := wb.Add(0, "one", true, pi1.put)
wbItem := wb.lookup[id]
checkOnHeap(t, wb, wbItem)
checkInLookup(t, wb, wbItem)
// cancel when not uploading
assert.True(t, wb.remove(item))
assert.True(t, wb.Remove(id))
checkNotOnHeap(t, wb, wbItem)
checkNotInLookup(t, wb, wbItem)
assert.False(t, pi1.cancelled)
// add item
pi2 := newPutItem(t)
wbItem = wb.add(item, "one", true, pi2.put)
id = wb.Add(id, "one", true, pi2.put)
wbItem = wb.lookup[id]
checkOnHeap(t, wb, wbItem)
checkInLookup(t, wb, wbItem)
@ -568,7 +571,7 @@ func TestWriteBackRemove(t *testing.T) {
<-pi2.started
// cancel when uploading
assert.True(t, wb.remove(item))
assert.True(t, wb.Remove(id))
checkNotOnHeap(t, wb, wbItem)
checkNotInLookup(t, wb, wbItem)
assert.True(t, pi2.cancelled)
@ -578,19 +581,18 @@ func TestWriteBackCancelUpload(t *testing.T) {
wb, cancel := newTestWriteBack(t)
defer cancel()
item := &Item{}
// cancel when not in writeback
assert.False(t, wb.cancelUpload(item))
assert.False(t, wb.cancelUpload(1))
// add item
pi := newPutItem(t)
wbItem := wb.add(item, "one", true, pi.put)
id := wb.Add(0, "one", true, pi.put)
wbItem := wb.lookup[id]
checkOnHeap(t, wb, wbItem)
checkInLookup(t, wb, wbItem)
// cancel when not uploading
assert.False(t, wb.cancelUpload(item))
assert.False(t, wb.cancelUpload(id))
checkOnHeap(t, wb, wbItem)
checkInLookup(t, wb, wbItem)
@ -600,7 +602,7 @@ func TestWriteBackCancelUpload(t *testing.T) {
checkInLookup(t, wb, wbItem)
// cancel when uploading
assert.True(t, wb.cancelUpload(item))
assert.True(t, wb.cancelUpload(id))
checkOnHeap(t, wb, wbItem)
checkInLookup(t, wb, wbItem)
assert.True(t, pi.cancelled)