forked from TrueCloudLab/rclone
447 lines
12 KiB
Go
447 lines
12 KiB
Go
// Package writeback keeps track of the files which need to be written
|
|
// back to storage
|
|
package writeback
|
|
|
|
import (
|
|
"container/heap"
|
|
"context"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/rclone/rclone/fs/fserrors"
|
|
"github.com/rclone/rclone/vfs/vfscommon"
|
|
)
|
|
|
|
const (
|
|
maxUploadDelay = 5 * time.Minute // max delay betwen upload attempts
|
|
)
|
|
|
|
// PutFn is the interface that item provides to store the data
|
|
type PutFn func(context.Context) error
|
|
|
|
// Handle is returned for callers to keep track of writeback items
|
|
type Handle uint64
|
|
|
|
// WriteBack keeps track of the items which need to be written back to the disk at some point
|
|
type WriteBack struct {
|
|
ctx context.Context
|
|
mu sync.Mutex
|
|
items writeBackItems // priority queue of *writeBackItem - writeBackItems are in here while awaiting transfer only
|
|
lookup map[Handle]*writeBackItem // for getting a *writeBackItem from a Handle - writeBackItems are in here until cancelled
|
|
opt *vfscommon.Options // VFS options
|
|
timer *time.Timer // next scheduled time for the uploader
|
|
expiry time.Time // time the next item exires or IsZero
|
|
uploads int // number of uploads in progress
|
|
|
|
// read and written with atomic
|
|
id Handle // id of the last writeBackItem created
|
|
}
|
|
|
|
// New make a new WriteBack
|
|
//
|
|
// cancel the context to stop the background processing
|
|
func New(ctx context.Context, opt *vfscommon.Options) *WriteBack {
|
|
wb := &WriteBack{
|
|
ctx: ctx,
|
|
items: writeBackItems{},
|
|
lookup: make(map[Handle]*writeBackItem),
|
|
opt: opt,
|
|
}
|
|
heap.Init(&wb.items)
|
|
return wb
|
|
}
|
|
|
|
// writeBackItem stores an Item awaiting writeback
|
|
//
|
|
// These are stored on the items heap when awaiting transfer but
|
|
// removed from the items heap when transferring. They remain in the
|
|
// lookup map until cancelled.
|
|
//
|
|
// writeBack.mu must be held to manipulate this
|
|
type writeBackItem struct {
|
|
name string // name of the item so we don't have to read it from item
|
|
id Handle // id of the item
|
|
index int // index into the priority queue for update
|
|
expiry time.Time // When this expires we will write it back
|
|
uploading bool // True if item is being processed by upload() method
|
|
onHeap bool // true if this item is on the items heap
|
|
cancel context.CancelFunc // To cancel the upload with
|
|
done chan struct{} // closed when the cancellation completes
|
|
putFn PutFn // To write the object data
|
|
tries int // number of times we have tried to upload
|
|
delay time.Duration // delay between upload attempts
|
|
}
|
|
|
|
// A writeBackItems implements a priority queue by implementing
|
|
// heap.Interface and holds writeBackItems.
|
|
type writeBackItems []*writeBackItem
|
|
|
|
func (ws writeBackItems) Len() int { return len(ws) }
|
|
|
|
func (ws writeBackItems) Less(i, j int) bool {
|
|
a, b := ws[i], ws[j]
|
|
// If times are equal then use ID to disambiguate
|
|
if a.expiry.Equal(b.expiry) {
|
|
return a.id < b.id
|
|
}
|
|
return a.expiry.Before(b.expiry)
|
|
}
|
|
|
|
func (ws writeBackItems) Swap(i, j int) {
|
|
ws[i], ws[j] = ws[j], ws[i]
|
|
ws[i].index = i
|
|
ws[j].index = j
|
|
}
|
|
|
|
func (ws *writeBackItems) Push(x interface{}) {
|
|
n := len(*ws)
|
|
item := x.(*writeBackItem)
|
|
item.index = n
|
|
*ws = append(*ws, item)
|
|
}
|
|
|
|
func (ws *writeBackItems) Pop() interface{} {
|
|
old := *ws
|
|
n := len(old)
|
|
item := old[n-1]
|
|
old[n-1] = nil // avoid memory leak
|
|
item.index = -1 // for safety
|
|
*ws = old[0 : n-1]
|
|
return item
|
|
}
|
|
|
|
// update modifies the expiry of an Item in the queue.
|
|
//
|
|
// call with lock held
|
|
func (ws *writeBackItems) _update(item *writeBackItem, expiry time.Time) {
|
|
item.expiry = expiry
|
|
heap.Fix(ws, item.index)
|
|
}
|
|
|
|
// return a new expiry time based from now until the WriteBack timeout
|
|
//
|
|
// call with lock held
|
|
func (wb *WriteBack) _newExpiry() time.Time {
|
|
expiry := time.Now()
|
|
if wb.opt.WriteBack > 0 {
|
|
expiry = expiry.Add(wb.opt.WriteBack)
|
|
}
|
|
// expiry = expiry.Round(time.Millisecond)
|
|
return expiry
|
|
}
|
|
|
|
// make a new writeBackItem
|
|
//
|
|
// call with the lock held
|
|
func (wb *WriteBack) _newItem(id Handle, name string) *writeBackItem {
|
|
wb.SetID(&id)
|
|
wbItem := &writeBackItem{
|
|
name: name,
|
|
expiry: wb._newExpiry(),
|
|
delay: wb.opt.WriteBack,
|
|
id: id,
|
|
}
|
|
wb._addItem(wbItem)
|
|
wb._pushItem(wbItem)
|
|
return wbItem
|
|
}
|
|
|
|
// add a writeBackItem to the lookup map
|
|
//
|
|
// call with the lock held
|
|
func (wb *WriteBack) _addItem(wbItem *writeBackItem) {
|
|
wb.lookup[wbItem.id] = wbItem
|
|
}
|
|
|
|
// delete a writeBackItem from the lookup map
|
|
//
|
|
// call with the lock held
|
|
func (wb *WriteBack) _delItem(wbItem *writeBackItem) {
|
|
delete(wb.lookup, wbItem.id)
|
|
}
|
|
|
|
// pop a writeBackItem from the items heap
|
|
//
|
|
// call with the lock held
|
|
func (wb *WriteBack) _popItem() (wbItem *writeBackItem) {
|
|
wbItem = heap.Pop(&wb.items).(*writeBackItem)
|
|
wbItem.onHeap = false
|
|
return wbItem
|
|
}
|
|
|
|
// push a writeBackItem onto the items heap
|
|
//
|
|
// call with the lock held
|
|
func (wb *WriteBack) _pushItem(wbItem *writeBackItem) {
|
|
if !wbItem.onHeap {
|
|
heap.Push(&wb.items, wbItem)
|
|
wbItem.onHeap = true
|
|
}
|
|
}
|
|
|
|
// remove a writeBackItem from the items heap
|
|
//
|
|
// call with the lock held
|
|
func (wb *WriteBack) _removeItem(wbItem *writeBackItem) {
|
|
if wbItem.onHeap {
|
|
heap.Remove(&wb.items, wbItem.index)
|
|
wbItem.onHeap = false
|
|
}
|
|
}
|
|
|
|
// peek the oldest writeBackItem - may be nil
|
|
//
|
|
// call with the lock held
|
|
func (wb *WriteBack) _peekItem() (wbItem *writeBackItem) {
|
|
if len(wb.items) == 0 {
|
|
return nil
|
|
}
|
|
return wb.items[0]
|
|
}
|
|
|
|
// stop the timer which runs the expiries
|
|
func (wb *WriteBack) _stopTimer() {
|
|
if wb.expiry.IsZero() {
|
|
return
|
|
}
|
|
wb.expiry = time.Time{}
|
|
// fs.Debugf(nil, "resetTimer STOP")
|
|
if wb.timer != nil {
|
|
wb.timer.Stop()
|
|
wb.timer = nil
|
|
}
|
|
}
|
|
|
|
// reset the timer which runs the expiries
|
|
func (wb *WriteBack) _resetTimer() {
|
|
wbItem := wb._peekItem()
|
|
if wbItem == nil {
|
|
wb._stopTimer()
|
|
} else {
|
|
if wb.expiry.Equal(wbItem.expiry) {
|
|
return
|
|
}
|
|
wb.expiry = wbItem.expiry
|
|
dt := time.Until(wbItem.expiry)
|
|
if dt < 0 {
|
|
dt = 0
|
|
}
|
|
// fs.Debugf(nil, "resetTimer dt=%v", dt)
|
|
if wb.timer != nil {
|
|
wb.timer.Stop()
|
|
}
|
|
wb.timer = time.AfterFunc(dt, func() {
|
|
wb.processItems(wb.ctx)
|
|
})
|
|
}
|
|
}
|
|
|
|
// SetID sets the Handle pointed to if it is non zero to the next
|
|
// handle.
|
|
func (wb *WriteBack) SetID(pid *Handle) {
|
|
if *pid == 0 {
|
|
*pid = Handle(atomic.AddUint64((*uint64)(&wb.id), 1))
|
|
}
|
|
}
|
|
|
|
// Add adds an item to the writeback queue or resets its timer if it
|
|
// is already there.
|
|
//
|
|
// If id is 0 then a new item will always be created and the new
|
|
// Handle will be returned.
|
|
//
|
|
// Use SetID to create Handles in advance of calling Add
|
|
//
|
|
// If modified is false then it it doesn't cancel a pending upload if
|
|
// there is one as there is no need.
|
|
func (wb *WriteBack) Add(id Handle, name string, modified bool, putFn PutFn) Handle {
|
|
wb.mu.Lock()
|
|
defer wb.mu.Unlock()
|
|
|
|
wbItem, ok := wb.lookup[id]
|
|
if !ok {
|
|
wbItem = wb._newItem(id, name)
|
|
} else {
|
|
if wbItem.uploading && modified {
|
|
// We are uploading already so cancel the upload
|
|
wb._cancelUpload(wbItem)
|
|
}
|
|
// Kick the timer on
|
|
wb.items._update(wbItem, wb._newExpiry())
|
|
}
|
|
wbItem.putFn = putFn
|
|
wb._resetTimer()
|
|
return wbItem.id
|
|
}
|
|
|
|
// Remove should be called when a file should be removed from the
|
|
// writeback queue. This cancels a writeback if there is one and
|
|
// doesn't return the item to the queue.
|
|
func (wb *WriteBack) Remove(id Handle) (found bool) {
|
|
wb.mu.Lock()
|
|
defer wb.mu.Unlock()
|
|
|
|
wbItem, found := wb.lookup[id]
|
|
if found {
|
|
fs.Debugf(wbItem.name, "vfs cache: cancelling writeback (uploading %v) %p item %d", wbItem.uploading, wbItem, wbItem.id)
|
|
if wbItem.uploading {
|
|
// We are uploading already so cancel the upload
|
|
wb._cancelUpload(wbItem)
|
|
}
|
|
// Remove the item from the heap
|
|
wb._removeItem(wbItem)
|
|
// Remove the item from the lookup map
|
|
wb._delItem(wbItem)
|
|
}
|
|
wb._resetTimer()
|
|
return found
|
|
}
|
|
|
|
// Rename should be called when a file might be uploading and it gains
|
|
// a new name. This will cancel the upload and put it back in the
|
|
// queue.
|
|
func (wb *WriteBack) Rename(id Handle, name string) {
|
|
wb.mu.Lock()
|
|
defer wb.mu.Unlock()
|
|
|
|
wbItem, ok := wb.lookup[id]
|
|
if !ok {
|
|
return
|
|
}
|
|
if wbItem.uploading {
|
|
// We are uploading already so cancel the upload
|
|
wb._cancelUpload(wbItem)
|
|
}
|
|
wbItem.name = name
|
|
// Kick the timer on
|
|
wb.items._update(wbItem, wb._newExpiry())
|
|
|
|
wb._resetTimer()
|
|
}
|
|
|
|
// upload the item - called as a goroutine
|
|
//
|
|
// uploading will have been incremented here already
|
|
func (wb *WriteBack) upload(ctx context.Context, wbItem *writeBackItem) {
|
|
wb.mu.Lock()
|
|
defer wb.mu.Unlock()
|
|
putFn := wbItem.putFn
|
|
wbItem.tries++
|
|
|
|
fs.Debugf(wbItem.name, "vfs cache: starting upload")
|
|
|
|
wb.mu.Unlock()
|
|
err := putFn(ctx)
|
|
wb.mu.Lock()
|
|
|
|
wbItem.cancel() // cancel context to release resources since store done
|
|
|
|
wbItem.uploading = false
|
|
wb.uploads--
|
|
|
|
if err != nil {
|
|
// FIXME should this have a max number of transfer attempts?
|
|
wbItem.delay *= 2
|
|
if wbItem.delay > maxUploadDelay {
|
|
wbItem.delay = maxUploadDelay
|
|
}
|
|
if _, uerr := fserrors.Cause(err); uerr == context.Canceled {
|
|
fs.Infof(wbItem.name, "vfs cache: upload canceled")
|
|
// Upload was cancelled so reset timer
|
|
wbItem.delay = wb.opt.WriteBack
|
|
} else {
|
|
fs.Errorf(wbItem.name, "vfs cache: failed to upload try #%d, will retry in %v: %v", wbItem.tries, wbItem.delay, err)
|
|
}
|
|
// push the item back on the queue for retry
|
|
wb._pushItem(wbItem)
|
|
wb.items._update(wbItem, time.Now().Add(wbItem.delay))
|
|
} else {
|
|
fs.Infof(wbItem.name, "vfs cache: upload succeeded try #%d", wbItem.tries)
|
|
// show that we are done with the item
|
|
wb._delItem(wbItem)
|
|
}
|
|
wb._resetTimer()
|
|
close(wbItem.done)
|
|
}
|
|
|
|
// cancel the upload - the item should be on the heap after this returns
|
|
//
|
|
// call with lock held
|
|
func (wb *WriteBack) _cancelUpload(wbItem *writeBackItem) {
|
|
if !wbItem.uploading {
|
|
return
|
|
}
|
|
fs.Debugf(wbItem.name, "vfs cache: cancelling upload")
|
|
if wbItem.cancel != nil {
|
|
// Cancel the upload - this may or may not be effective
|
|
wbItem.cancel()
|
|
// wait for the uploader to finish
|
|
//
|
|
// we need to wait without the lock otherwise the
|
|
// background part will never run.
|
|
wb.mu.Unlock()
|
|
<-wbItem.done
|
|
wb.mu.Lock()
|
|
}
|
|
// uploading items are not on the heap so add them back
|
|
wb._pushItem(wbItem)
|
|
fs.Debugf(wbItem.name, "vfs cache: cancelled upload")
|
|
}
|
|
|
|
// cancelUpload cancels the upload of the item if there is one in progress
|
|
//
|
|
// it returns true if there was an upload in progress
|
|
func (wb *WriteBack) cancelUpload(id Handle) bool {
|
|
wb.mu.Lock()
|
|
defer wb.mu.Unlock()
|
|
wbItem, ok := wb.lookup[id]
|
|
if !ok || !wbItem.uploading {
|
|
return false
|
|
}
|
|
wb._cancelUpload(wbItem)
|
|
return true
|
|
}
|
|
|
|
// this uploads as many items as possible
|
|
func (wb *WriteBack) processItems(ctx context.Context) {
|
|
wb.mu.Lock()
|
|
defer wb.mu.Unlock()
|
|
|
|
if wb.ctx.Err() != nil {
|
|
return
|
|
}
|
|
|
|
resetTimer := true
|
|
for wbItem := wb._peekItem(); wbItem != nil && time.Until(wbItem.expiry) <= 0; wbItem = wb._peekItem() {
|
|
// If reached transfer limit don't restart the timer
|
|
if wb.uploads >= fs.Config.Transfers {
|
|
fs.Debugf(wbItem.name, "vfs cache: delaying writeback as --transfers exceeded")
|
|
resetTimer = false
|
|
break
|
|
}
|
|
// Pop the item, mark as uploading and start the uploader
|
|
wbItem = wb._popItem()
|
|
//fs.Debugf(wbItem.name, "uploading = true %p item %p", wbItem, wbItem.item)
|
|
wbItem.uploading = true
|
|
wb.uploads++
|
|
newCtx, cancel := context.WithCancel(ctx)
|
|
wbItem.cancel = cancel
|
|
wbItem.done = make(chan struct{})
|
|
go wb.upload(newCtx, wbItem)
|
|
}
|
|
|
|
if resetTimer {
|
|
wb._resetTimer()
|
|
} else {
|
|
wb._stopTimer()
|
|
}
|
|
}
|
|
|
|
// Stats return the number of uploads in progress and queued
|
|
func (wb *WriteBack) Stats() (uploadsInProgress, uploadsQueued int) {
|
|
wb.mu.Lock()
|
|
defer wb.mu.Unlock()
|
|
return wb.uploads, len(wb.items)
|
|
}
|